Skip to content

Commit

Permalink
pick(19817): graphql: port MovePackage queries to full_objects_history (
Browse files Browse the repository at this point in the history
#19848)

## Description

Spotted that MovePackage queries were still using `objects_history`,
which is not going to work after that table gets pruned to two epochs.

This PR replaces queries to `objects_history` with queries to
`full_objects_history`, with support from `objects_version` and
`packages` to fetch the checkpoint sequence number where necessary.

## Test plan

Existing tests:

```
sui$ cargo nextest run -p sui-graphql-e2e-tests
```

And ran the following queries on the mainnet DB:

```graphql
query AllPackages($after: String) {
  packages(first: 5 after: $after) {
    pageInfo {
      hasNextPage
      endCursor
      startCursor
    }
    nodes {
      address
      version
    }
  }
}

query AllPackagesBack($before: String) {
  packages(last: 5 before: $before) {
    pageInfo {
      hasNextPage
      endCursor
      startCursor
    }
    nodes {
      address
      version
    }
  }
}

query SystemPackages($after: String) {
  packageVersions(address: "0x2" first: 5 after: $after) {
    pageInfo {
      hasNextPage
      endCursor
      startCursor
    }

    nodes {
      address
      version
    }
  }
}

query SystemPackagesBack($before: String) {
  packageVersions(address: "0x2" last: 5 before: $before) {
    pageInfo {
      hasNextPage
      endCursor
    }

    nodes {
      address
      version
    }
  }
}

query UserPackage($after: String) {
  packageVersions(
    address: "0xbc3df36be17f27ac98e3c839b2589db8475fa07b20657b08e8891e3aaf5ee5f9"
		first: 5
    after: $after
  ) {
    pageInfo {
      hasNextPage
      endCursor
      startCursor
    }
    nodes {
      address
      version
    }
  }
}
```

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
amnn authored Oct 14, 2024
1 parent 33d1cd6 commit 7a42022
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 60 deletions.
137 changes: 77 additions & 60 deletions crates/sui-graphql-rpc/src/types/move_package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use diesel::prelude::QueryableByName;
use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, Selectable};
use diesel_async::scoped_futures::ScopedFutureExt;
use serde::{Deserialize, Serialize};
use sui_indexer::models::objects::StoredHistoryObject;
use sui_indexer::models::objects::StoredFullHistoryObject;
use sui_indexer::schema::packages;
use sui_package_resolver::{error::Error as PackageCacheError, Package as ParsedMovePackage};
use sui_types::is_system_package;
Expand Down Expand Up @@ -123,9 +123,10 @@ struct TypeOrigin {
#[derive(Selectable, QueryableByName)]
#[diesel(table_name = packages)]
struct StoredHistoryPackage {
checkpoint_sequence_number: i64,
original_id: Vec<u8>,
#[diesel(embed)]
object: StoredHistoryObject,
object: StoredFullHistoryObject,
}

pub(crate) struct MovePackageDowncastError;
Expand All @@ -136,6 +137,10 @@ pub(crate) type Cursor = BcsCursor<PackageCursor>;
/// The inner struct for the `MovePackage` cursor. The package is identified by the checkpoint it
/// was created in, its original ID, and its version, and the `checkpoint_viewed_at` specifies the
/// checkpoint snapshot that the data came from.
///
/// The cursor includes the checkpoint the package was created in as well, so that when we paginate
/// through all the packages on-chain, if we pause half way through, we can pick back up based on
/// the checkpoint we've seen so far.
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
pub(crate) struct PackageCursor {
pub checkpoint_sequence_number: u64,
Expand Down Expand Up @@ -711,26 +716,26 @@ impl MovePackage {
async move {
let mut q = query!(
r#"
SELECT
p.original_id,
o.*
FROM
packages p
INNER JOIN
objects_history o
ON
p.package_id = o.object_id
AND p.package_version = o.object_version
AND p.checkpoint_sequence_number = o.checkpoint_sequence_number
"#
SELECT
p.checkpoint_sequence_number,
p.original_id,
o.*
FROM
packages p
INNER JOIN
full_objects_history o
ON
p.package_id = o.object_id
AND p.package_version = o.object_version
"#
);

q = filter!(
q,
format!("o.checkpoint_sequence_number < {before_checkpoint}")
format!("p.checkpoint_sequence_number < {before_checkpoint}")
);
if let Some(after) = after_checkpoint {
q = filter!(q, format!("{after} < o.checkpoint_sequence_number"));
q = filter!(q, format!("{after} < p.checkpoint_sequence_number"));
}

page.paginate_raw_query::<StoredHistoryPackage>(conn, checkpoint_viewed_at, q)
Expand All @@ -745,8 +750,7 @@ impl MovePackage {
// The "checkpoint viewed at" sets a consistent upper bound for the nested queries.
for stored in results {
let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
let package =
MovePackage::try_from_stored_history_object(stored.object, checkpoint_viewed_at)?;
let package = MovePackage::try_from_serialized(stored.object, checkpoint_viewed_at)?;
conn.edges.push(Edge::new(cursor, package));
}

Expand Down Expand Up @@ -798,8 +802,7 @@ impl MovePackage {
// The "checkpoint viewed at" sets a consistent upper bound for the nested queries.
for stored in results {
let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
let package =
MovePackage::try_from_stored_history_object(stored.object, checkpoint_viewed_at)?;
let package = MovePackage::try_from_serialized(stored.object, checkpoint_viewed_at)?;
conn.edges.push(Edge::new(cursor, package));
}

Expand All @@ -809,15 +812,19 @@ impl MovePackage {
/// `checkpoint_viewed_at` points to the checkpoint snapshot that this `MovePackage` came from.
/// This is stored in the `MovePackage` so that related fields from the package are read from
/// the same checkpoint (consistently).
pub(crate) fn try_from_stored_history_object(
history_object: StoredHistoryObject,
pub(crate) fn try_from_serialized(
history_object: StoredFullHistoryObject,
checkpoint_viewed_at: u64,
) -> Result<Self, Error> {
let object = Object::try_from_stored_history_object(
history_object,
let object = Object::new_serialized(
SuiAddress::from_bytes(&history_object.object_id)
.map_err(|_| Error::Internal("Invalid package ID".to_string()))?,
history_object.object_version as u64,
history_object.serialized_object,
checkpoint_viewed_at,
/* root_version */ None,
)?;
history_object.object_version as u64,
);

Self::try_from(&object).map_err(|_| Error::Internal("Not a package!".to_string()))
}
}
Expand All @@ -833,12 +840,12 @@ impl RawPaginated<Cursor> for StoredHistoryPackage {
filter!(
query,
format!(
"o.checkpoint_sequence_number > {cp} OR (\
o.checkpoint_sequence_number = {cp} AND
original_id > '\\x{id}'::bytea OR (\
original_id = '\\x{id}'::bytea AND \
o.object_version >= {pv}\
))",
"(p.checkpoint_sequence_number > {cp} OR (\
p.checkpoint_sequence_number = {cp} AND \
(original_id > '\\x{id}'::bytea OR (\
original_id = '\\x{id}'::bytea AND \
object_version >= {pv}\
))))",
cp = cursor.checkpoint_sequence_number,
id = hex::encode(&cursor.original_id),
pv = cursor.package_version,
Expand All @@ -850,12 +857,12 @@ impl RawPaginated<Cursor> for StoredHistoryPackage {
filter!(
query,
format!(
"o.checkpoint_sequence_number < {cp} OR (\
o.checkpoint_sequence_number = {cp} AND
original_id < '\\x{id}'::bytea OR (\
original_id = '\\x{id}'::bytea AND \
o.object_version <= {pv}\
))",
"(p.checkpoint_sequence_number < {cp} OR (\
p.checkpoint_sequence_number = {cp} AND \
(original_id < '\\x{id}'::bytea OR (\
original_id = '\\x{id}'::bytea AND \
object_version <= {pv}\
))))",
cp = cursor.checkpoint_sequence_number,
id = hex::encode(&cursor.original_id),
pv = cursor.package_version,
Expand All @@ -866,22 +873,22 @@ impl RawPaginated<Cursor> for StoredHistoryPackage {
fn order(asc: bool, query: RawQuery) -> RawQuery {
if asc {
query
.order_by("o.checkpoint_sequence_number ASC")
.order_by("original_id ASC")
.order_by("o.object_version ASC")
.order_by("1 ASC") // checkpoint_sequence_number
.order_by("2 ASC") // original_id
.order_by("object_version ASC")
} else {
query
.order_by("o.checkpoint_sequence_number DESC")
.order_by("original_id DESC")
.order_by("o.object_version DESC")
.order_by("1 DESC") // checkpoint_sequence_number
.order_by("2 DESC") // original_id
.order_by("object_version DESC")
}
}
}

impl Target<Cursor> for StoredHistoryPackage {
fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
Cursor::new(PackageCursor {
checkpoint_sequence_number: self.object.checkpoint_sequence_number as u64,
checkpoint_sequence_number: self.checkpoint_sequence_number as u64,
original_id: self.original_id.clone(),
package_version: self.object.object_version as u64,
checkpoint_viewed_at,
Expand Down Expand Up @@ -1049,7 +1056,11 @@ impl TryFrom<&Object> for MovePackage {
}

/// Query for fetching all the versions of a system package (assumes that `package` has already been
/// verified as a system package). This is an `objects_history` query disguised as a package query.
/// verified as a system package). This is a `full_objects_history` query disguised as a package query.
///
/// We do this because the `packages` table contains only one entry per package ID. For the system
/// packages, this is the latest version of the package (for user packages, there is only one entry
/// per package ID anyway as each version of a package gets its own ID).
fn system_package_version_query(
package: SuiAddress,
filter: Option<MovePackageVersionFilter>,
Expand All @@ -1058,35 +1069,42 @@ fn system_package_version_query(
let mut q = query!(
r#"
SELECT
o.object_id AS original_id,
p.checkpoint_sequence_number,
p.original_id,
o.*
FROM
objects_version v
FROM (
SELECT
object_id AS package_id,
object_id AS original_id,
object_version AS package_version,
cp_sequence_number AS checkpoint_sequence_number
FROM
objects_version
) p
LEFT JOIN
objects_history o
full_objects_history o
ON
v.object_id = o.object_id
AND v.object_version = o.object_version
AND v.cp_sequence_number = o.checkpoint_sequence_number
p.package_id = o.object_id
AND p.package_version = o.object_version
"#
);

q = filter!(
q,
format!(
"v.object_id = '\\x{}'::bytea",
"original_id = '\\x{}'::bytea",
hex::encode(package.into_vec())
)
);

if let Some(after) = filter.as_ref().and_then(|f| f.after_version) {
let a: u64 = after.into();
q = filter!(q, format!("v.object_version > {a}"));
q = filter!(q, format!("object_version > {a}"));
}

if let Some(before) = filter.as_ref().and_then(|f| f.before_version) {
let b: u64 = before.into();
q = filter!(q, format!("v.object_version < {b}"));
q = filter!(q, format!("object_version < {b}"));
}

q
Expand All @@ -1101,20 +1119,19 @@ fn user_package_version_query(
let mut q = query!(
r#"
SELECT
p.checkpoint_sequence_number,
p.original_id,
o.*
FROM
packages q
INNER JOIN
packages p
ON
q.original_id = p.original_id
USING (original_id)
INNER JOIN
objects_history o
full_objects_history o
ON
p.package_id = o.object_id
AND p.package_version = o.object_version
AND p.checkpoint_sequence_number = o.checkpoint_sequence_number
"#
);

Expand Down
3 changes: 3 additions & 0 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,10 @@ impl PgIndexerStore {
.do_update()
.set((
packages::package_id.eq(excluded(packages::package_id)),
packages::package_version.eq(excluded(packages::package_version)),
packages::move_package.eq(excluded(packages::move_package)),
packages::checkpoint_sequence_number
.eq(excluded(packages::checkpoint_sequence_number)),
))
.execute(conn)
.await?;
Expand Down

0 comments on commit 7a42022

Please sign in to comment.