From 7a42022920bb2b3b5841e66f79a452c29e2e20b2 Mon Sep 17 00:00:00 2001 From: Ashok Menon Date: Mon, 14 Oct 2024 18:05:51 +0100 Subject: [PATCH] pick(19817): graphql: port MovePackage queries to full_objects_history (#19848) ## Description Spotted that MovePackage queries were still using `objects_history`, which is not going to work after that table gets pruned to two epochs. This PR replaces queries to `objects_history` with queries to `full_objects_history`, with support from `objects_version` and `packages` to fetch the checkpoint sequence number where necessary. ## Test plan Existing tests: ``` sui$ cargo nextest run -p sui-graphql-e2e-tests ``` And ran the following queries on the mainnet DB: ```graphql query AllPackages($after: String) { packages(first: 5 after: $after) { pageInfo { hasNextPage endCursor startCursor } nodes { address version } } } query AllPackagesBack($before: String) { packages(last: 5 before: $before) { pageInfo { hasNextPage endCursor startCursor } nodes { address version } } } query SystemPackages($after: String) { packageVersions(address: "0x2" first: 5 after: $after) { pageInfo { hasNextPage endCursor startCursor } nodes { address version } } } query SystemPackagesBack($before: String) { packageVersions(address: "0x2" last: 5 before: $before) { pageInfo { hasNextPage endCursor } nodes { address version } } } query UserPackage($after: String) { packageVersions( address: "0xbc3df36be17f27ac98e3c839b2589db8475fa07b20657b08e8891e3aaf5ee5f9" first: 5 after: $after ) { pageInfo { hasNextPage endCursor startCursor } nodes { address version } } } ``` --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- .../sui-graphql-rpc/src/types/move_package.rs | 137 ++++++++++-------- .../sui-indexer/src/store/pg_indexer_store.rs | 3 + 2 files changed, 80 insertions(+), 60 deletions(-) diff --git a/crates/sui-graphql-rpc/src/types/move_package.rs b/crates/sui-graphql-rpc/src/types/move_package.rs index 6a8b0573cb154..98e442eeeed34 100644 --- a/crates/sui-graphql-rpc/src/types/move_package.rs +++ b/crates/sui-graphql-rpc/src/types/move_package.rs @@ -32,7 +32,7 @@ use diesel::prelude::QueryableByName; use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, Selectable}; use diesel_async::scoped_futures::ScopedFutureExt; use serde::{Deserialize, Serialize}; -use sui_indexer::models::objects::StoredHistoryObject; +use sui_indexer::models::objects::StoredFullHistoryObject; use sui_indexer::schema::packages; use sui_package_resolver::{error::Error as PackageCacheError, Package as ParsedMovePackage}; use sui_types::is_system_package; @@ -123,9 +123,10 @@ struct TypeOrigin { #[derive(Selectable, QueryableByName)] #[diesel(table_name = packages)] struct StoredHistoryPackage { + checkpoint_sequence_number: i64, original_id: Vec, #[diesel(embed)] - object: StoredHistoryObject, + object: StoredFullHistoryObject, } pub(crate) struct MovePackageDowncastError; @@ -136,6 +137,10 @@ pub(crate) type Cursor = BcsCursor; /// The inner struct for the `MovePackage` cursor. The package is identified by the checkpoint it /// was created in, its original ID, and its version, and the `checkpoint_viewed_at` specifies the /// checkpoint snapshot that the data came from. +/// +/// The cursor includes the checkpoint the package was created in as well, so that when we paginate +/// through all the packages on-chain, if we pause half way through, we can pick back up based on +/// the checkpoint we've seen so far. #[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] pub(crate) struct PackageCursor { pub checkpoint_sequence_number: u64, @@ -711,26 +716,26 @@ impl MovePackage { async move { let mut q = query!( r#" - SELECT - p.original_id, - o.* - FROM - packages p - INNER JOIN - objects_history o - ON - p.package_id = o.object_id - AND p.package_version = o.object_version - AND p.checkpoint_sequence_number = o.checkpoint_sequence_number - "# + SELECT + p.checkpoint_sequence_number, + p.original_id, + o.* + FROM + packages p + INNER JOIN + full_objects_history o + ON + p.package_id = o.object_id + AND p.package_version = o.object_version + "# ); q = filter!( q, - format!("o.checkpoint_sequence_number < {before_checkpoint}") + format!("p.checkpoint_sequence_number < {before_checkpoint}") ); if let Some(after) = after_checkpoint { - q = filter!(q, format!("{after} < o.checkpoint_sequence_number")); + q = filter!(q, format!("{after} < p.checkpoint_sequence_number")); } page.paginate_raw_query::(conn, checkpoint_viewed_at, q) @@ -745,8 +750,7 @@ impl MovePackage { // The "checkpoint viewed at" sets a consistent upper bound for the nested queries. for stored in results { let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor(); - let package = - MovePackage::try_from_stored_history_object(stored.object, checkpoint_viewed_at)?; + let package = MovePackage::try_from_serialized(stored.object, checkpoint_viewed_at)?; conn.edges.push(Edge::new(cursor, package)); } @@ -798,8 +802,7 @@ impl MovePackage { // The "checkpoint viewed at" sets a consistent upper bound for the nested queries. for stored in results { let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor(); - let package = - MovePackage::try_from_stored_history_object(stored.object, checkpoint_viewed_at)?; + let package = MovePackage::try_from_serialized(stored.object, checkpoint_viewed_at)?; conn.edges.push(Edge::new(cursor, package)); } @@ -809,15 +812,19 @@ impl MovePackage { /// `checkpoint_viewed_at` points to the checkpoint snapshot that this `MovePackage` came from. /// This is stored in the `MovePackage` so that related fields from the package are read from /// the same checkpoint (consistently). - pub(crate) fn try_from_stored_history_object( - history_object: StoredHistoryObject, + pub(crate) fn try_from_serialized( + history_object: StoredFullHistoryObject, checkpoint_viewed_at: u64, ) -> Result { - let object = Object::try_from_stored_history_object( - history_object, + let object = Object::new_serialized( + SuiAddress::from_bytes(&history_object.object_id) + .map_err(|_| Error::Internal("Invalid package ID".to_string()))?, + history_object.object_version as u64, + history_object.serialized_object, checkpoint_viewed_at, - /* root_version */ None, - )?; + history_object.object_version as u64, + ); + Self::try_from(&object).map_err(|_| Error::Internal("Not a package!".to_string())) } } @@ -833,12 +840,12 @@ impl RawPaginated for StoredHistoryPackage { filter!( query, format!( - "o.checkpoint_sequence_number > {cp} OR (\ - o.checkpoint_sequence_number = {cp} AND - original_id > '\\x{id}'::bytea OR (\ - original_id = '\\x{id}'::bytea AND \ - o.object_version >= {pv}\ - ))", + "(p.checkpoint_sequence_number > {cp} OR (\ + p.checkpoint_sequence_number = {cp} AND \ + (original_id > '\\x{id}'::bytea OR (\ + original_id = '\\x{id}'::bytea AND \ + object_version >= {pv}\ + ))))", cp = cursor.checkpoint_sequence_number, id = hex::encode(&cursor.original_id), pv = cursor.package_version, @@ -850,12 +857,12 @@ impl RawPaginated for StoredHistoryPackage { filter!( query, format!( - "o.checkpoint_sequence_number < {cp} OR (\ - o.checkpoint_sequence_number = {cp} AND - original_id < '\\x{id}'::bytea OR (\ - original_id = '\\x{id}'::bytea AND \ - o.object_version <= {pv}\ - ))", + "(p.checkpoint_sequence_number < {cp} OR (\ + p.checkpoint_sequence_number = {cp} AND \ + (original_id < '\\x{id}'::bytea OR (\ + original_id = '\\x{id}'::bytea AND \ + object_version <= {pv}\ + ))))", cp = cursor.checkpoint_sequence_number, id = hex::encode(&cursor.original_id), pv = cursor.package_version, @@ -866,14 +873,14 @@ impl RawPaginated for StoredHistoryPackage { fn order(asc: bool, query: RawQuery) -> RawQuery { if asc { query - .order_by("o.checkpoint_sequence_number ASC") - .order_by("original_id ASC") - .order_by("o.object_version ASC") + .order_by("1 ASC") // checkpoint_sequence_number + .order_by("2 ASC") // original_id + .order_by("object_version ASC") } else { query - .order_by("o.checkpoint_sequence_number DESC") - .order_by("original_id DESC") - .order_by("o.object_version DESC") + .order_by("1 DESC") // checkpoint_sequence_number + .order_by("2 DESC") // original_id + .order_by("object_version DESC") } } } @@ -881,7 +888,7 @@ impl RawPaginated for StoredHistoryPackage { impl Target for StoredHistoryPackage { fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor { Cursor::new(PackageCursor { - checkpoint_sequence_number: self.object.checkpoint_sequence_number as u64, + checkpoint_sequence_number: self.checkpoint_sequence_number as u64, original_id: self.original_id.clone(), package_version: self.object.object_version as u64, checkpoint_viewed_at, @@ -1049,7 +1056,11 @@ impl TryFrom<&Object> for MovePackage { } /// Query for fetching all the versions of a system package (assumes that `package` has already been -/// verified as a system package). This is an `objects_history` query disguised as a package query. +/// verified as a system package). This is a `full_objects_history` query disguised as a package query. +/// +/// We do this because the `packages` table contains only one entry per package ID. For the system +/// packages, this is the latest version of the package (for user packages, there is only one entry +/// per package ID anyway as each version of a package gets its own ID). fn system_package_version_query( package: SuiAddress, filter: Option, @@ -1058,35 +1069,42 @@ fn system_package_version_query( let mut q = query!( r#" SELECT - o.object_id AS original_id, + p.checkpoint_sequence_number, + p.original_id, o.* - FROM - objects_version v + FROM ( + SELECT + object_id AS package_id, + object_id AS original_id, + object_version AS package_version, + cp_sequence_number AS checkpoint_sequence_number + FROM + objects_version + ) p LEFT JOIN - objects_history o + full_objects_history o ON - v.object_id = o.object_id - AND v.object_version = o.object_version - AND v.cp_sequence_number = o.checkpoint_sequence_number + p.package_id = o.object_id + AND p.package_version = o.object_version "# ); q = filter!( q, format!( - "v.object_id = '\\x{}'::bytea", + "original_id = '\\x{}'::bytea", hex::encode(package.into_vec()) ) ); if let Some(after) = filter.as_ref().and_then(|f| f.after_version) { let a: u64 = after.into(); - q = filter!(q, format!("v.object_version > {a}")); + q = filter!(q, format!("object_version > {a}")); } if let Some(before) = filter.as_ref().and_then(|f| f.before_version) { let b: u64 = before.into(); - q = filter!(q, format!("v.object_version < {b}")); + q = filter!(q, format!("object_version < {b}")); } q @@ -1101,20 +1119,19 @@ fn user_package_version_query( let mut q = query!( r#" SELECT + p.checkpoint_sequence_number, p.original_id, o.* FROM packages q INNER JOIN packages p - ON - q.original_id = p.original_id + USING (original_id) INNER JOIN - objects_history o + full_objects_history o ON p.package_id = o.object_id AND p.package_version = o.object_version - AND p.checkpoint_sequence_number = o.checkpoint_sequence_number "# ); diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index 742180b88b3cf..df9c490e49223 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -826,7 +826,10 @@ impl PgIndexerStore { .do_update() .set(( packages::package_id.eq(excluded(packages::package_id)), + packages::package_version.eq(excluded(packages::package_version)), packages::move_package.eq(excluded(packages::move_package)), + packages::checkpoint_sequence_number + .eq(excluded(packages::checkpoint_sequence_number)), )) .execute(conn) .await?;