diff --git a/crates/sui-graphql-e2e-tests/tests/stable/prune/prune.exp b/crates/sui-graphql-e2e-tests/tests/stable/prune/prune.exp index 53b30cad5d1b5..c9bb5015c0e70 100644 --- a/crates/sui-graphql-e2e-tests/tests/stable/prune/prune.exp +++ b/crates/sui-graphql-e2e-tests/tests/stable/prune/prune.exp @@ -48,10 +48,13 @@ task 10, line 43: //# advance-epoch Epoch advanced: 2 -task 11, lines 45-55: +task 11, lines 45-58: //# run-graphql --wait-for-checkpoint-pruned 4 Response: { "data": { + "epoch": { + "epochId": 2 + }, "checkpoints": { "nodes": [ { @@ -71,7 +74,7 @@ Response: { } } -task 12, lines 57-67: +task 12, lines 60-70: //# run-graphql Response: { "data": { @@ -86,7 +89,7 @@ Response: { } } -task 13, lines 69-72: +task 13, lines 72-75: //# run-graphql Response: { "data": { @@ -94,7 +97,7 @@ Response: { } } -task 14, lines 74-103: +task 14, lines 77-106: //# run-graphql Response: { "data": { diff --git a/crates/sui-graphql-e2e-tests/tests/stable/prune/prune.move b/crates/sui-graphql-e2e-tests/tests/stable/prune/prune.move index 025dd4190a004..b6a611469aaf9 100644 --- a/crates/sui-graphql-e2e-tests/tests/stable/prune/prune.move +++ b/crates/sui-graphql-e2e-tests/tests/stable/prune/prune.move @@ -44,6 +44,9 @@ module Test::M1 { //# run-graphql --wait-for-checkpoint-pruned 4 { + epoch { + epochId + } checkpoints { nodes { epoch { diff --git a/crates/sui-graphql-e2e-tests/tests/stable/prune/transactions.exp b/crates/sui-graphql-e2e-tests/tests/stable/prune/transactions.exp new file mode 100644 index 0000000000000..b5a2038fb291a --- /dev/null +++ b/crates/sui-graphql-e2e-tests/tests/stable/prune/transactions.exp @@ -0,0 +1,450 @@ +processed 25 tasks + +init: +A: object(0,0) + +task 1, lines 6-25: +//# publish +created: object(1,0) +mutated: object(0,1) +gas summary: computation_cost: 1000000, storage_cost: 5570800, storage_rebate: 0, non_refundable_storage_fee: 0 + +task 2, line 27: +//# run Test::M1::create --sender A --args 0 @A +created: object(2,0) +mutated: object(0,0) +gas summary: computation_cost: 1000000, storage_cost: 2302800, storage_rebate: 0, non_refundable_storage_fee: 0 + +task 3, line 29: +//# create-checkpoint +Checkpoint created: 1 + +task 4, line 31: +//# advance-epoch +Epoch advanced: 0 + +task 5, line 33: +//# run Test::M1::create --sender A --args 1 @A +created: object(5,0) +mutated: object(0,0) +gas summary: computation_cost: 1000000, storage_cost: 2302800, storage_rebate: 978120, non_refundable_storage_fee: 9880 + +task 6, line 35: +//# create-checkpoint +Checkpoint created: 3 + +task 7, line 37: +//# advance-epoch +Epoch advanced: 1 + +task 8, line 39: +//# run Test::M1::create --sender A --args 2 @A +created: object(8,0) +mutated: object(0,0) +gas summary: computation_cost: 1000000, storage_cost: 2302800, storage_rebate: 978120, non_refundable_storage_fee: 9880 + +task 9, line 41: +//# create-checkpoint +Checkpoint created: 5 + +task 10, line 43: +//# advance-epoch +Epoch advanced: 2 + +task 11, line 45: +//# run Test::M1::create --sender A --args 3 @A +created: object(11,0) +mutated: object(0,0) +gas summary: computation_cost: 1000000, storage_cost: 2302800, storage_rebate: 978120, non_refundable_storage_fee: 9880 + +task 12, line 47: +//# create-checkpoint +Checkpoint created: 7 + +task 13, lines 49-96: +//# run-graphql --wait-for-checkpoint-pruned 4 +Response: { + "data": { + "epoch": { + "epochId": 3 + }, + "checkpoints": { + "nodes": [ + { + "epoch": { + "epochId": 2 + }, + "sequenceNumber": 5 + }, + { + "epoch": { + "epochId": 2 + }, + "sequenceNumber": 6 + }, + { + "epoch": { + "epochId": 3 + }, + "sequenceNumber": 7 + } + ] + }, + "unfiltered": { + "pageInfo": { + "startCursor": "eyJjIjo3LCJ0Ijo2LCJpIjpmYWxzZX0", + "endCursor": "eyJjIjo3LCJ0Ijo4LCJpIjpmYWxzZX0", + "hasPreviousPage": false, + "hasNextPage": false + }, + "nodes": [ + { + "digest": "2Ba4H9L5Lgqvf7j42ZQt1io3EZq4QzRVMX9KQcHVi3fY", + "effects": { + "checkpoint": { + "sequenceNumber": 5 + } + } + }, + { + "digest": "3JTu6zCWqr6ntrcXes17pVGaDsi5TohbZG3KmJMB69wd", + "effects": { + "checkpoint": { + "sequenceNumber": 6 + } + } + }, + { + "digest": "6d7jabN5eUfM7okBxvBZM39HAjAB7tmUuSFv7Yzs5P68", + "effects": { + "checkpoint": { + "sequenceNumber": 7 + } + } + } + ] + }, + "transactionBlocks": { + "pageInfo": { + "startCursor": "eyJjIjo3LCJ0Ijo2LCJpIjpmYWxzZX0", + "endCursor": "eyJjIjo3LCJ0Ijo4LCJpIjpmYWxzZX0", + "hasPreviousPage": false, + "hasNextPage": false + }, + "nodes": [ + { + "digest": "2Ba4H9L5Lgqvf7j42ZQt1io3EZq4QzRVMX9KQcHVi3fY", + "effects": { + "checkpoint": { + "sequenceNumber": 5 + } + } + }, + { + "digest": "6d7jabN5eUfM7okBxvBZM39HAjAB7tmUuSFv7Yzs5P68", + "effects": { + "checkpoint": { + "sequenceNumber": 7 + } + } + } + ] + } + } +} + +task 14, lines 98-117: +//# run-graphql --wait-for-checkpoint-pruned 4 +Response: { + "data": { + "transactionBlocks": { + "pageInfo": { + "startCursor": "eyJjIjo3LCJ0Ijo4LCJpIjpmYWxzZX0", + "endCursor": "eyJjIjo3LCJ0Ijo4LCJpIjpmYWxzZX0", + "hasPreviousPage": false, + "hasNextPage": false + }, + "nodes": [ + { + "digest": "6d7jabN5eUfM7okBxvBZM39HAjAB7tmUuSFv7Yzs5P68", + "effects": { + "checkpoint": { + "sequenceNumber": 7 + } + } + } + ] + } + } +} + +task 15, lines 119-138: +//# run-graphql --wait-for-checkpoint-pruned 4 +Response: { + "data": { + "transactionBlocks": { + "pageInfo": { + "startCursor": "eyJjIjo3LCJ0Ijo2LCJpIjpmYWxzZX0", + "endCursor": "eyJjIjo3LCJ0Ijo2LCJpIjpmYWxzZX0", + "hasPreviousPage": false, + "hasNextPage": false + }, + "nodes": [ + { + "digest": "2Ba4H9L5Lgqvf7j42ZQt1io3EZq4QzRVMX9KQcHVi3fY", + "effects": { + "checkpoint": { + "sequenceNumber": 5 + } + } + } + ] + } + } +} + +task 16, lines 142-161: +//# run-graphql --wait-for-checkpoint-pruned 4 +Response: { + "data": { + "transactionBlocks": { + "pageInfo": { + "startCursor": null, + "endCursor": null, + "hasPreviousPage": false, + "hasNextPage": false + }, + "nodes": [] + } + } +} + +task 17, lines 163-182: +//# run-graphql --wait-for-checkpoint-pruned 4 +Response: { + "data": { + "transactionBlocks": { + "pageInfo": { + "startCursor": null, + "endCursor": null, + "hasPreviousPage": false, + "hasNextPage": false + }, + "nodes": [] + } + } +} + +task 18, lines 185-204: +//# run-graphql +Response: { + "data": { + "transactionBlocks": { + "pageInfo": { + "startCursor": "eyJjIjo3LCJ0Ijo2LCJpIjpmYWxzZX0", + "endCursor": "eyJjIjo3LCJ0Ijo4LCJpIjpmYWxzZX0", + "hasPreviousPage": false, + "hasNextPage": false + }, + "nodes": [ + { + "digest": "2Ba4H9L5Lgqvf7j42ZQt1io3EZq4QzRVMX9KQcHVi3fY", + "effects": { + "checkpoint": { + "sequenceNumber": 5 + } + } + }, + { + "digest": "6d7jabN5eUfM7okBxvBZM39HAjAB7tmUuSFv7Yzs5P68", + "effects": { + "checkpoint": { + "sequenceNumber": 7 + } + } + } + ] + } + } +} + +task 19, lines 206-225: +//# run-graphql +Response: { + "data": { + "transactionBlocks": { + "pageInfo": { + "startCursor": "eyJjIjo3LCJ0Ijo4LCJpIjpmYWxzZX0", + "endCursor": "eyJjIjo3LCJ0Ijo4LCJpIjpmYWxzZX0", + "hasPreviousPage": false, + "hasNextPage": false + }, + "nodes": [ + { + "digest": "6d7jabN5eUfM7okBxvBZM39HAjAB7tmUuSFv7Yzs5P68", + "effects": { + "checkpoint": { + "sequenceNumber": 7 + } + } + } + ] + } + } +} + +task 20, lines 227-246: +//# run-graphql +Response: { + "data": { + "transactionBlocks": { + "pageInfo": { + "startCursor": "eyJjIjo3LCJ0Ijo2LCJpIjpmYWxzZX0", + "endCursor": "eyJjIjo3LCJ0Ijo2LCJpIjpmYWxzZX0", + "hasPreviousPage": false, + "hasNextPage": false + }, + "nodes": [ + { + "digest": "2Ba4H9L5Lgqvf7j42ZQt1io3EZq4QzRVMX9KQcHVi3fY", + "effects": { + "checkpoint": { + "sequenceNumber": 5 + } + } + } + ] + } + } +} + +task 21, lines 248-268: +//# run-graphql --cursors {"c":7,"t":6,"i":false} +Response: { + "data": { + "transactionBlocks": { + "pageInfo": { + "startCursor": "eyJjIjo3LCJ0Ijo3LCJpIjpmYWxzZX0", + "endCursor": "eyJjIjo3LCJ0Ijo4LCJpIjpmYWxzZX0", + "hasPreviousPage": true, + "hasNextPage": false + }, + "nodes": [ + { + "digest": "3JTu6zCWqr6ntrcXes17pVGaDsi5TohbZG3KmJMB69wd", + "effects": { + "checkpoint": { + "sequenceNumber": 6 + } + } + }, + { + "digest": "6d7jabN5eUfM7okBxvBZM39HAjAB7tmUuSFv7Yzs5P68", + "effects": { + "checkpoint": { + "sequenceNumber": 7 + } + } + } + ] + } + } +} + +task 22, lines 270-289: +//# run-graphql --cursors {"c":7,"t":0,"i":false} +Response: { + "data": { + "transactionBlocks": { + "pageInfo": { + "startCursor": null, + "endCursor": null, + "hasPreviousPage": false, + "hasNextPage": false + }, + "nodes": [] + } + } +} + +task 23, lines 291-310: +//# run-graphql --cursors {"c":7,"t":0,"i":true} +Response: { + "data": { + "transactionBlocks": { + "pageInfo": { + "startCursor": "eyJjIjo3LCJ0Ijo2LCJpIjpmYWxzZX0", + "endCursor": "eyJjIjo3LCJ0Ijo4LCJpIjpmYWxzZX0", + "hasPreviousPage": false, + "hasNextPage": false + }, + "nodes": [ + { + "digest": "2Ba4H9L5Lgqvf7j42ZQt1io3EZq4QzRVMX9KQcHVi3fY", + "effects": { + "checkpoint": { + "sequenceNumber": 5 + } + } + }, + { + "digest": "3JTu6zCWqr6ntrcXes17pVGaDsi5TohbZG3KmJMB69wd", + "effects": { + "checkpoint": { + "sequenceNumber": 6 + } + } + }, + { + "digest": "6d7jabN5eUfM7okBxvBZM39HAjAB7tmUuSFv7Yzs5P68", + "effects": { + "checkpoint": { + "sequenceNumber": 7 + } + } + } + ] + } + } +} + +task 24, lines 312-331: +//# run-graphql --cursors {"c":7,"t":0,"i":true} +Response: { + "data": { + "transactionBlocks": { + "pageInfo": { + "startCursor": "eyJjIjo3LCJ0Ijo2LCJpIjp0cnVlfQ", + "endCursor": "eyJjIjo3LCJ0Ijo4LCJpIjp0cnVlfQ", + "hasPreviousPage": false, + "hasNextPage": false + }, + "nodes": [ + { + "digest": "2Ba4H9L5Lgqvf7j42ZQt1io3EZq4QzRVMX9KQcHVi3fY", + "effects": { + "checkpoint": { + "sequenceNumber": 5 + } + } + }, + { + "digest": "3JTu6zCWqr6ntrcXes17pVGaDsi5TohbZG3KmJMB69wd", + "effects": { + "checkpoint": { + "sequenceNumber": 6 + } + } + }, + { + "digest": "6d7jabN5eUfM7okBxvBZM39HAjAB7tmUuSFv7Yzs5P68", + "effects": { + "checkpoint": { + "sequenceNumber": 7 + } + } + } + ] + } + } +} diff --git a/crates/sui-graphql-e2e-tests/tests/stable/prune/transactions.move b/crates/sui-graphql-e2e-tests/tests/stable/prune/transactions.move new file mode 100644 index 0000000000000..2135998d9d667 --- /dev/null +++ b/crates/sui-graphql-e2e-tests/tests/stable/prune/transactions.move @@ -0,0 +1,331 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//# init --protocol-version 51 --addresses Test=0x0 --accounts A --simulator --epochs-to-keep 2 + +//# publish +module Test::M1 { + use sui::coin::Coin; + + public struct Object has key, store { + id: UID, + value: u64, + } + + fun foo(_p1: u64, value1: T, _value2: &Coin, _p2: u64): T { + value1 + } + + public entry fun create(value: u64, recipient: address, ctx: &mut TxContext) { + transfer::public_transfer( + Object { id: object::new(ctx), value }, + recipient + ) + } +} + +//# run Test::M1::create --sender A --args 0 @A + +//# create-checkpoint + +//# advance-epoch + +//# run Test::M1::create --sender A --args 1 @A + +//# create-checkpoint + +//# advance-epoch + +//# run Test::M1::create --sender A --args 2 @A + +//# create-checkpoint + +//# advance-epoch + +//# run Test::M1::create --sender A --args 3 @A + +//# create-checkpoint + +//# run-graphql --wait-for-checkpoint-pruned 4 +# The smallest unpruned epoch is 2, starting with checkpoint sequence number 5 +# When a range is not specified, transactions queries should return results starting from the smallest unpruned tx +{ + epoch { + epochId + } + checkpoints { + nodes { + epoch { + epochId + } + sequenceNumber + } + } + unfiltered: transactionBlocks { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } + transactionBlocks(filter: { sentAddress: "@{A}" }) { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} + +//# run-graphql --wait-for-checkpoint-pruned 4 +# In the absence of an upper bound, graphql sets the upper bound to `checkpoint_viewed_at`'s max tx + 1 (right-open interval) +{ + transactionBlocks(filter: { afterCheckpoint: 5 sentAddress: "@{A}" }) { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} + +//# run-graphql --wait-for-checkpoint-pruned 4 +# In the absence of a lower bound, graphql sets the lower bound to the smallest unpruned checkpoint's min tx +{ + transactionBlocks(filter: { beforeCheckpoint: 7 sentAddress: "@{A}" }) { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} + + + +//# run-graphql --wait-for-checkpoint-pruned 4 +# If the caller tries to fetch data outside of the unpruned range, they should receive an empty connection. +{ + transactionBlocks(filter: { atCheckpoint: 0 sentAddress: "@{A}" }) { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} + +//# run-graphql --wait-for-checkpoint-pruned 4 +# Empty response if caller tries to fetch data beyond the available upper bound +{ + transactionBlocks(filter: { atCheckpoint: 10 sentAddress: "@{A}" }) { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} + + +//# run-graphql +# Mirror from the back +{ + transactionBlocks(last: 10 filter: { sentAddress: "@{A}" }) { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} + +//# run-graphql +# Mirror from the back +{ + transactionBlocks(last: 10 filter: { afterCheckpoint: 5 sentAddress: "@{A}" }) { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} + +//# run-graphql +# Mirror from the back +{ + transactionBlocks(last: 10 filter: { beforeCheckpoint: 7 sentAddress: "@{A}" }) { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} + +//# run-graphql --cursors {"c":7,"t":6,"i":false} +# The first tx after pruning has seq num 6. +# When viewed at checkpoint 7, there are two more txs that follow it. +{ + transactionBlocks(after: "@{cursor_0}") { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} + +//# run-graphql --cursors {"c":7,"t":0,"i":false} +# Data is pruned and no longer available +{ + transactionBlocks(after: "@{cursor_0}") { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} + +//# run-graphql --cursors {"c":7,"t":0,"i":true} +# Data is pruned and no longer available +{ + transactionBlocks(after: "@{cursor_0}") { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} + +//# run-graphql --cursors {"c":7,"t":0,"i":true} +# Data is pruned and no longer available +{ + transactionBlocks(scanLimit: 10000 after: "@{cursor_0}") { + pageInfo { + startCursor + endCursor + hasPreviousPage + hasNextPage + } + nodes { + digest + effects { + checkpoint { + sequenceNumber + } + } + } + } +} diff --git a/crates/sui-graphql-rpc/src/server/builder.rs b/crates/sui-graphql-rpc/src/server/builder.rs index 9537f19c27d6e..396eb0eb93308 100644 --- a/crates/sui-graphql-rpc/src/server/builder.rs +++ b/crates/sui-graphql-rpc/src/server/builder.rs @@ -652,7 +652,7 @@ async fn health_check( .unwrap_or_else(|| DEFAULT_MAX_CHECKPOINT_LAG); let checkpoint_timestamp = - Duration::from_millis(watermark_lock.read().await.checkpoint_timestamp_ms); + Duration::from_millis(watermark_lock.read().await.hi_cp_timestamp_ms); let now_millis = Utc::now().timestamp_millis(); @@ -731,9 +731,11 @@ pub mod tests { let pg_conn_pool = PgManager::new(reader); let cancellation_token = CancellationToken::new(); let watermark = Watermark { - checkpoint: 1, - checkpoint_timestamp_ms: 1, + hi_cp: 1, + hi_cp_timestamp_ms: 1, epoch: 0, + lo_cp: 0, + lo_tx: 0, }; let state = AppState::new( connection_config.clone(), diff --git a/crates/sui-graphql-rpc/src/server/watermark_task.rs b/crates/sui-graphql-rpc/src/server/watermark_task.rs index ca7bb81e8ea70..949fe0c94bb65 100644 --- a/crates/sui-graphql-rpc/src/server/watermark_task.rs +++ b/crates/sui-graphql-rpc/src/server/watermark_task.rs @@ -6,7 +6,10 @@ use crate::error::Error; use crate::metrics::Metrics; use crate::types::chain_identifier::ChainIdentifier; use async_graphql::ServerError; -use diesel::{ExpressionMethods, OptionalExtension, QueryDsl}; +use diesel::{ + query_dsl::positional_order_dsl::PositionalOrderDsl, CombineDsl, ExpressionMethods, + OptionalExtension, QueryDsl, +}; use diesel_async::scoped_futures::ScopedFutureExt; use std::mem; use std::sync::Arc; @@ -40,15 +43,18 @@ pub(crate) type WatermarkLock = Arc>; /// changes. #[derive(Clone, Copy, Default)] pub(crate) struct Watermark { - /// The checkpoint upper-bound for the query. - pub checkpoint: u64, - /// The checkpoint upper-bound timestamp for the query. - pub checkpoint_timestamp_ms: u64, + /// The inclusive checkpoint upper-bound for the query. + pub hi_cp: u64, + /// The timestamp of the inclusive upper-bound checkpoint for the query. + pub hi_cp_timestamp_ms: u64, /// The current epoch. pub epoch: u64, + /// Smallest queryable checkpoint - checkpoints below this value are pruned. + pub lo_cp: u64, + pub lo_tx: u64, } -/// Starts an infinite loop that periodically updates the `checkpoint_viewed_at` high watermark. +/// Starts an infinite loop that periodically updates the watermark. impl WatermarkTask { pub(crate) fn new( db: Db, @@ -83,7 +89,7 @@ impl WatermarkTask { return; }, _ = interval.tick() => { - let Watermark {checkpoint, epoch, checkpoint_timestamp_ms } = match Watermark::query(&self.db).await { + let Watermark { lo_cp, lo_tx, hi_cp, hi_cp_timestamp_ms, epoch } = match Watermark::query(&self.db).await { Ok(Some(watermark)) => watermark, Ok(None) => continue, Err(e) => { @@ -96,11 +102,14 @@ impl WatermarkTask { // Write the watermark as follows to limit how long we hold the lock let prev_epoch = { let mut w = self.watermark.write().await; - w.checkpoint = checkpoint; - w.checkpoint_timestamp_ms = checkpoint_timestamp_ms; + w.hi_cp = hi_cp; + w.hi_cp_timestamp_ms = hi_cp_timestamp_ms; + w.lo_cp = lo_cp; + w.lo_tx = lo_tx; mem::replace(&mut w.epoch, epoch) }; + // On epoch boundary, notify subscribers if epoch > prev_epoch { self.sender.send(epoch).unwrap(); } @@ -155,21 +164,43 @@ impl Watermark { pub(crate) async fn new(lock: WatermarkLock) -> Self { let w = lock.read().await; Self { - checkpoint: w.checkpoint, - checkpoint_timestamp_ms: w.checkpoint_timestamp_ms, + hi_cp: w.hi_cp, + hi_cp_timestamp_ms: w.hi_cp_timestamp_ms, epoch: w.epoch, + lo_cp: w.lo_cp, + lo_tx: w.lo_tx, } } + #[allow(clippy::type_complexity)] pub(crate) async fn query(db: &Db) -> Result, Error> { use checkpoints::dsl; - let Some((checkpoint, checkpoint_timestamp_ms, epoch)): Option<(i64, i64, i64)> = db + let Some(result): Option)>> = db .execute(move |conn| { async { - conn.first(move || { - dsl::checkpoints - .select((dsl::sequence_number, dsl::timestamp_ms, dsl::epoch)) + conn.results(move || { + let min_cp = dsl::checkpoints + .select(( + dsl::sequence_number, + dsl::timestamp_ms, + dsl::epoch, + dsl::min_tx_sequence_number, + )) + .order_by(dsl::sequence_number.asc()) + .limit(1); + + let max_cp = dsl::checkpoints + .select(( + dsl::sequence_number, + dsl::timestamp_ms, + dsl::epoch, + dsl::min_tx_sequence_number, + )) .order_by(dsl::sequence_number.desc()) + .limit(1); + + // Order by sequence_number, which is in the 1st position + min_cp.union_all(max_cp).positional_order_by(1) }) .await .optional() @@ -177,14 +208,37 @@ impl Watermark { .scope_boxed() }) .await - .map_err(|e| Error::Internal(format!("Failed to fetch checkpoint: {e}")))? + .map_err(|e| Error::Internal(format!("Failed to fetch watermark data: {e}")))? else { + // An empty response from the db is valid when indexer has not written any checkpoints + // to the db yet. return Ok(None); }; + + let (lo_cp, lo_tx) = if let Some((cp, _, _, Some(tx))) = result.first() { + (cp, tx) + } else { + return Err(Error::Internal( + "Expected entry for tx lower bound and min_tx_sequence_number to be non-null" + .to_string(), + )); + }; + + let (hi_cp, hi_cp_timestamp_ms, epoch) = + if let Some((cp, timestamp_ms, epoch, _)) = result.last() { + (cp, timestamp_ms, epoch) + } else { + return Err(Error::Internal( + "Expected entry for tx upper bound".to_string(), + )); + }; + Ok(Some(Watermark { - checkpoint: checkpoint as u64, - checkpoint_timestamp_ms: checkpoint_timestamp_ms as u64, - epoch: epoch as u64, + hi_cp: *hi_cp as u64, + hi_cp_timestamp_ms: *hi_cp_timestamp_ms as u64, + epoch: *epoch as u64, + lo_cp: *lo_cp as u64, + lo_tx: *lo_tx as u64, })) } } diff --git a/crates/sui-graphql-rpc/src/types/epoch.rs b/crates/sui-graphql-rpc/src/types/epoch.rs index ab205f3b1c2f3..ce52c66e8c420 100644 --- a/crates/sui-graphql-rpc/src/types/epoch.rs +++ b/crates/sui-graphql-rpc/src/types/epoch.rs @@ -126,8 +126,8 @@ impl Epoch { let last = match self.stored.last_checkpoint_id { Some(last) => last as u64, None => { - let Watermark { checkpoint, .. } = *ctx.data_unchecked(); - checkpoint + let Watermark { hi_cp, .. } = *ctx.data_unchecked(); + hi_cp } }; diff --git a/crates/sui-graphql-rpc/src/types/query.rs b/crates/sui-graphql-rpc/src/types/query.rs index 5b0e48ef65514..d9077b352408f 100644 --- a/crates/sui-graphql-rpc/src/types/query.rs +++ b/crates/sui-graphql-rpc/src/types/query.rs @@ -74,8 +74,8 @@ impl Query { /// Range of checkpoints that the RPC has data available for (for data /// that can be tied to a particular checkpoint). async fn available_range(&self, ctx: &Context<'_>) -> Result { - let Watermark { checkpoint, .. } = *ctx.data()?; - AvailableRange::query(ctx.data_unchecked(), checkpoint) + let Watermark { hi_cp, .. } = *ctx.data()?; + AvailableRange::query(ctx.data_unchecked(), hi_cp) .await .extend() } @@ -211,10 +211,10 @@ impl Query { address: SuiAddress, root_version: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; Ok(Some(Owner { address, - checkpoint_viewed_at: checkpoint, + checkpoint_viewed_at: hi_cp, root_version: root_version.map(|v| v.into()), })) } @@ -227,10 +227,10 @@ impl Query { address: SuiAddress, version: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; let key = match version { - Some(version) => Object::at_version(version.into(), checkpoint), - None => Object::latest_at(checkpoint), + Some(version) => Object::at_version(version.into(), hi_cp), + None => Object::latest_at(hi_cp), }; Object::query(ctx, address, key).await.extend() @@ -252,10 +252,10 @@ impl Query { address: SuiAddress, version: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; let key = match version { - Some(version) => MovePackage::by_version(version.into(), checkpoint), - None => MovePackage::by_id_at(checkpoint), + Some(version) => MovePackage::by_version(version.into(), hi_cp), + None => MovePackage::by_id_at(hi_cp), }; MovePackage::query(ctx, address, key).await.extend() @@ -270,19 +270,19 @@ impl Query { ctx: &Context<'_>, address: SuiAddress, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; - MovePackage::query(ctx, address, MovePackage::latest_at(checkpoint)) + let Watermark { hi_cp, .. } = *ctx.data()?; + MovePackage::query(ctx, address, MovePackage::latest_at(hi_cp)) .await .extend() } /// Look-up an Account by its SuiAddress. async fn address(&self, ctx: &Context<'_>, address: SuiAddress) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; Ok(Some(Address { address, - checkpoint_viewed_at: checkpoint, + checkpoint_viewed_at: hi_cp, })) } @@ -297,8 +297,8 @@ impl Query { /// Fetch epoch information by ID (defaults to the latest epoch). async fn epoch(&self, ctx: &Context<'_>, id: Option) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; - Epoch::query(ctx, id.map(|id| id.into()), checkpoint) + let Watermark { hi_cp, .. } = *ctx.data()?; + Epoch::query(ctx, id.map(|id| id.into()), hi_cp) .await .extend() } @@ -310,8 +310,8 @@ impl Query { ctx: &Context<'_>, id: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; - Checkpoint::query(ctx, id.unwrap_or_default(), checkpoint) + let Watermark { hi_cp, .. } = *ctx.data()?; + Checkpoint::query(ctx, id.unwrap_or_default(), hi_cp) .await .extend() } @@ -322,8 +322,8 @@ impl Query { ctx: &Context<'_>, digest: Digest, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; - let lookup = TransactionBlock::by_digest(digest, checkpoint); + let Watermark { hi_cp, .. } = *ctx.data()?; + let lookup = TransactionBlock::by_digest(digest, hi_cp); TransactionBlock::query(ctx, lookup).await.extend() } @@ -340,7 +340,7 @@ impl Query { before: Option, type_: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?; let coin = type_.map_or_else(GAS::type_tag, |t| t.0); @@ -349,7 +349,7 @@ impl Query { page, coin, /* owner */ None, - checkpoint, + hi_cp, ) .await .extend() @@ -364,10 +364,10 @@ impl Query { last: Option, before: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?; - Epoch::paginate(ctx.data_unchecked(), page, checkpoint) + Epoch::paginate(ctx.data_unchecked(), page, hi_cp) .await .extend() } @@ -381,17 +381,12 @@ impl Query { last: Option, before: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?; - Checkpoint::paginate( - ctx.data_unchecked(), - page, - /* epoch */ None, - checkpoint, - ) - .await - .extend() + Checkpoint::paginate(ctx.data_unchecked(), page, /* epoch */ None, hi_cp) + .await + .extend() } /// The transaction blocks that exist in the network. @@ -424,19 +419,13 @@ impl Query { filter: Option, scan_limit: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?; - TransactionBlock::paginate( - ctx, - page, - filter.unwrap_or_default(), - checkpoint, - scan_limit, - ) - .await - .extend() + TransactionBlock::paginate(ctx, page, filter.unwrap_or_default(), hi_cp, scan_limit) + .await + .extend() } /// Query events that are emitted in the network. @@ -451,14 +440,14 @@ impl Query { before: Option, filter: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?; Event::paginate( ctx.data_unchecked(), page, filter.unwrap_or_default(), - checkpoint, + hi_cp, ) .await .extend() @@ -474,14 +463,14 @@ impl Query { before: Option, filter: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?; Object::paginate( ctx.data_unchecked(), page, filter.unwrap_or_default(), - checkpoint, + hi_cp, ) .await .extend() @@ -501,10 +490,10 @@ impl Query { before: Option, filter: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?; - MovePackage::paginate_by_checkpoint(ctx.data_unchecked(), page, filter, checkpoint) + MovePackage::paginate_by_checkpoint(ctx.data_unchecked(), page, filter, hi_cp) .await .extend() } @@ -522,10 +511,10 @@ impl Query { address: SuiAddress, filter: Option, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; let page = Page::from_params(ctx.data_unchecked(), first, after, last, before)?; - MovePackage::paginate_by_version(ctx.data_unchecked(), page, address, filter, checkpoint) + MovePackage::paginate_by_version(ctx.data_unchecked(), page, address, filter, hi_cp) .await .extend() } @@ -548,14 +537,14 @@ impl Query { ctx: &Context<'_>, domain: Domain, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; - Ok(NameService::resolve_to_record(ctx, &domain, checkpoint) + let Watermark { hi_cp, .. } = *ctx.data()?; + Ok(NameService::resolve_to_record(ctx, &domain, hi_cp) .await .extend()? .and_then(|r| r.target_address) .map(|a| Address { address: a.into(), - checkpoint_viewed_at: checkpoint, + checkpoint_viewed_at: hi_cp, })) } @@ -565,17 +554,15 @@ impl Query { ctx: &Context<'_>, name: String, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; + let Watermark { hi_cp, .. } = *ctx.data()?; - NamedMovePackage::query(ctx, &name, checkpoint) - .await - .extend() + NamedMovePackage::query(ctx, &name, hi_cp).await.extend() } /// Fetch a type that includes dot move service names in it. async fn type_by_name(&self, ctx: &Context<'_>, name: String) -> Result { - let Watermark { checkpoint, .. } = *ctx.data()?; - let type_tag = NamedType::query(ctx, &name, checkpoint).await?; + let Watermark { hi_cp, .. } = *ctx.data()?; + let type_tag = NamedType::query(ctx, &name, hi_cp).await?; Ok(type_tag.into()) } @@ -587,8 +574,8 @@ impl Query { ctx: &Context<'_>, coin_type: ExactTypeFilter, ) -> Result> { - let Watermark { checkpoint, .. } = *ctx.data()?; - CoinMetadata::query(ctx.data_unchecked(), coin_type.0, checkpoint) + let Watermark { hi_cp, .. } = *ctx.data()?; + CoinMetadata::query(ctx.data_unchecked(), coin_type.0, hi_cp) .await .extend() } diff --git a/crates/sui-graphql-rpc/src/types/transaction_block/mod.rs b/crates/sui-graphql-rpc/src/types/transaction_block/mod.rs index 7bef4cf389438..9ad81d126c76a 100644 --- a/crates/sui-graphql-rpc/src/types/transaction_block/mod.rs +++ b/crates/sui-graphql-rpc/src/types/transaction_block/mod.rs @@ -170,8 +170,8 @@ impl TransactionBlock { // Non-stored transactions have a sentinel checkpoint_viewed_at value that generally // prevents access to further queries, but inputs should generally be available so try // to access them at the high watermark. - let Watermark { checkpoint, .. } = *ctx.data_unchecked(); - checkpoint + let Watermark { hi_cp, .. } = *ctx.data_unchecked(); + hi_cp }; Some(GasInput::from( @@ -342,6 +342,9 @@ impl TransactionBlock { /// and `function` should provide a value for `scan_limit`. This modifies querying behavior by /// limiting how many transactions to scan through before applying filters, and also affects /// pagination behavior. + /// + /// Queries for data that have been pruned will return an empty connection; we treat pruned data + /// as simply non-existent and thus no error is returned. pub(crate) async fn paginate( ctx: &Context<'_>, page: Page, @@ -350,6 +353,10 @@ impl TransactionBlock { scan_limit: Option, ) -> Result, Error> { let limits = &ctx.data_unchecked::().limits; + let db: &Db = ctx.data_unchecked(); + // If we've entered this function, we already fetched `checkpoint_viewed_at` from the + // `Watermark`, and so we must be able to retrieve `lo_cp` as well. + let Watermark { lo_cp, .. } = *ctx.data_unchecked(); // If the caller has provided some arbitrary combination of `function`, `kind`, // `recvAddress`, `inputObject`, or `changedObject`, we require setting a `scanLimit`. @@ -375,17 +382,19 @@ impl TransactionBlock { } } + let cursor_viewed_at = page.validate_cursor_consistency()?; + let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at); + let is_from_front = page.is_from_front(); + let cp_after = filter.after_checkpoint.map(u64::from); + let cp_at = filter.at_checkpoint.map(u64::from); + let cp_before = filter.before_checkpoint.map(u64::from); + // If page size or scan limit is 0, we want to standardize behavior by returning an empty // connection if filter.is_empty() || page.limit() == 0 || scan_limit.is_some_and(|v| v == 0) { return Ok(ScanConnection::new(false, false)); } - let cursor_viewed_at = page.validate_cursor_consistency()?; - let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at); - let db: &Db = ctx.data_unchecked(); - let is_from_front = page.is_from_front(); - use transactions::dsl as tx; let (prev, next, transactions, tx_bounds): ( bool, @@ -397,9 +406,10 @@ impl TransactionBlock { async move { let Some(tx_bounds) = TxBounds::query( conn, - filter.after_checkpoint.map(u64::from), - filter.at_checkpoint.map(u64::from), - filter.before_checkpoint.map(u64::from), + cp_after, + cp_at, + cp_before, + lo_cp, checkpoint_viewed_at, scan_limit, &page, diff --git a/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs b/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs index 46941617a56bd..22640672caed0 100644 --- a/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs +++ b/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs @@ -63,7 +63,7 @@ use crate::{ type_filter::{FqNameFilter, ModuleFilter}, }, }; -use diesel::{ExpressionMethods, OptionalExtension, QueryDsl}; +use diesel::{CombineDsl, ExpressionMethods, QueryDsl}; use std::fmt::Write; use sui_indexer::schema::checkpoints; @@ -103,7 +103,8 @@ use sui_indexer::schema::checkpoints; #[derive(Clone, Debug, Copy)] pub(crate) struct TxBounds { /// The inclusive lower bound tx_sequence_number derived from checkpoint bounds. If checkpoint - /// bounds are not provided, this will default to `0`. + /// bounds are not provided, this will default to the smallest transaction sequence number of + /// the earliest checkpoint that has not been pruned. tx_lo: u64, /// The exclusive upper bound tx_sequence_number derived from checkpoint bounds. If checkpoint @@ -129,20 +130,30 @@ impl TxBounds { /// Determines the `tx_sequence_number` range from the checkpoint bounds for a transaction block /// query. If no checkpoint range is specified, the default is between 0 and the /// `checkpoint_viewed_at`. The corresponding `tx_sequence_number` range is fetched from db, and - /// further adjusted by cursors and scan limit. If there are any inconsistencies or invalid - /// combinations, i.e. `after` cursor is greater than the upper bound, return None. + /// further adjusted by cursors and scan limit. If the checkpoints cannot be found, or if there + /// are any inconsistencies or invalid combinations, i.e. `after` cursor is greater than the + /// upper bound, return None. pub(crate) async fn query( conn: &mut Conn<'_>, cp_after: Option, cp_at: Option, cp_before: Option, + min_unpruned_checkpoint: u64, checkpoint_viewed_at: u64, scan_limit: Option, page: &Page, ) -> Result, diesel::result::Error> { - // Lowerbound in terms of checkpoint sequence number. We want to get the total transaction - // count of the checkpoint before this one, or 0 if there is no previous checkpoint. - let cp_lo = max_option([cp_after.map(|x| x.saturating_add(1)), cp_at]).unwrap_or(0); + // Inclusive lowerbound in terms of checkpoint sequence number. If a lower bound is not set, + // we will default to the smallest checkpoint available from the database, retrieved from + // the watermark. + // + // SAFETY: we can unwrap because of the `Some(min_unpruned_checkpoint)` + let cp_lo = max_option([ + cp_after.map(|x| x.saturating_add(1)), + cp_at, + Some(min_unpruned_checkpoint), + ]) + .unwrap(); let cp_before_inclusive = match cp_before { // There are no results strictly before checkpoint 0. @@ -151,65 +162,88 @@ impl TxBounds { None => None, }; - // Upperbound in terms of checkpoint sequence number. We want to get the total transaction - // count at the end of this checkpoint. If no upperbound is given, use - // `checkpoint_viewed_at`. + // Inclusive upper bound in terms of checkpoint sequence number. If no upperbound is given, + // use `checkpoint_viewed_at`. // // SAFETY: we can unwrap because of the `Some(checkpoint_viewed_at) let cp_hi = min_option([cp_before_inclusive, cp_at, Some(checkpoint_viewed_at)]).unwrap(); + // Read from the `checkpoints` table rather than the `pruner_cp_watermark` table, because + // the `checkpoints` table is pruned first. use checkpoints::dsl; - let (tx_lo, tx_hi) = if let Some(cp_prev) = cp_lo.checked_sub(1) { - let res: Vec = conn + // Inclusive lower bound and exclusive upper bound of the transaction sequence number range. + let (tx_lo, tx_hi) = { + let res: Vec<(i64, Option, i64)> = conn .results(move || { - dsl::checkpoints - .select(dsl::network_total_transactions) - .filter(dsl::sequence_number.eq_any([cp_prev as i64, cp_hi as i64])) - .order_by(dsl::network_total_transactions.asc()) + let min_cp_range = dsl::checkpoints + .select(( + dsl::sequence_number, + dsl::min_tx_sequence_number, + dsl::network_total_transactions, + )) + .filter(dsl::sequence_number.eq(cp_lo as i64)) + .limit(1); + + let max_cp_range = dsl::checkpoints + .select(( + dsl::sequence_number, + dsl::min_tx_sequence_number, + dsl::network_total_transactions, + )) + .filter(dsl::sequence_number.eq(cp_hi as i64)) + .limit(1); + + min_cp_range.union_all(max_cp_range) }) .await?; - // If there are not two distinct results, it means that the transaction bounds are - // empty (lo and hi are the same), or it means that the one or other of the checkpoints - // doesn't exist, so we can return early. - let &[lo, hi] = res.as_slice() else { + let Some(hi_record) = res + .iter() + .find(|&(checkpoint, _, _)| *checkpoint == cp_hi as i64) + else { return Ok(None); }; - (lo as u64, hi as u64) - } else { - let res: Option = conn - .first(move || { - dsl::checkpoints - .select(dsl::network_total_transactions) - .filter(dsl::sequence_number.eq(cp_hi as i64)) - }) - .await - .optional()?; - - // If there is no result, it means that the checkpoint doesn't exist, so we can return - // early. - let Some(hi) = res else { + let Some(lo_record) = res + .iter() + .find(|&(checkpoint, _, _)| *checkpoint == cp_lo as i64) + else { return Ok(None); }; - (0, hi as u64) + let tx_lo = match lo_record.1 { + Some(lo) => Ok(lo), + None => Err(diesel::result::Error::DatabaseError( + diesel::result::DatabaseErrorKind::Unknown, + Box::new( + "min_tx_sequence_number should never be None in production".to_string(), + ), + )), + }? as u64; + + (tx_lo, hi_record.2 as u64) }; - // If the cursors point outside checkpoint bounds, we can return early. - if matches!(page.after(), Some(a) if tx_hi <= a.tx_sequence_number.saturating_add(1)) { - return Ok(None); - } - - if matches!(page.before(), Some(b) if b.tx_sequence_number <= tx_lo) { - return Ok(None); + let cursor_lo_exclusive = page.after().map(|a| a.tx_sequence_number); + let cursor_lo = cursor_lo_exclusive.map(|a| a.saturating_add(1)); + let cursor_hi = page.before().map(|b| b.tx_sequence_number); + + match (cursor_lo, cursor_hi) { + (Some(lo), _) if tx_hi <= lo => return Ok(None), + (_, Some(hi)) if hi <= tx_lo => return Ok(None), + (Some(lo), Some(hi)) if hi <= lo => return Ok(None), + _ => { + if tx_hi <= tx_lo { + return Ok(None); + } + } } Ok(Some(Self { tx_lo, tx_hi, - cursor_lo_exclusive: page.after().map(|a| a.tx_sequence_number), - cursor_hi: page.before().map(|b| b.tx_sequence_number), + cursor_lo_exclusive, + cursor_hi, scan_limit, end: page.end(), })) diff --git a/crates/sui-graphql-rpc/src/types/zklogin_verify_signature.rs b/crates/sui-graphql-rpc/src/types/zklogin_verify_signature.rs index eb51fe116f263..80b5bcb2c71de 100644 --- a/crates/sui-graphql-rpc/src/types/zklogin_verify_signature.rs +++ b/crates/sui-graphql-rpc/src/types/zklogin_verify_signature.rs @@ -54,10 +54,10 @@ pub(crate) async fn verify_zklogin_signature( intent_scope: ZkLoginIntentScope, author: SuiAddress, ) -> Result { - let Watermark { checkpoint, .. } = *ctx.data_unchecked(); + let Watermark { hi_cp, .. } = *ctx.data_unchecked(); // get current epoch from db. - let Some(curr_epoch) = Epoch::query(ctx, None, checkpoint).await? else { + let Some(curr_epoch) = Epoch::query(ctx, None, hi_cp).await? else { return Err(Error::Internal( "Cannot get current epoch from db".to_string(), )); @@ -88,7 +88,7 @@ pub(crate) async fn verify_zklogin_signature( bcs: Base64(bcs::to_bytes(&1u64).unwrap()), }, DynamicFieldType::DynamicField, - checkpoint, + hi_cp, ) .await .map_err(|e| as_jwks_read_error(e.to_string()))?;