diff --git a/crates/torii/graphql/src/constants.rs b/crates/torii/graphql/src/constants.rs index 2d851f07b1..fc5a376f56 100644 --- a/crates/torii/graphql/src/constants.rs +++ b/crates/torii/graphql/src/constants.rs @@ -9,6 +9,8 @@ pub const EVENT_MESSAGE_TABLE: &str = "event_messages"; pub const MODEL_TABLE: &str = "models"; pub const TRANSACTION_TABLE: &str = "transactions"; pub const METADATA_TABLE: &str = "metadata"; +pub const ERC_BALANCE_TABLE: &str = "balances"; +pub const ERC_TRANSFER_TABLE: &str = "erc_transfers"; pub const ID_COLUMN: &str = "id"; pub const EVENT_ID_COLUMN: &str = "event_id"; diff --git a/crates/torii/graphql/src/object/erc/erc_balance.rs b/crates/torii/graphql/src/object/erc/erc_balance.rs index a749350fee..848303e71a 100644 --- a/crates/torii/graphql/src/object/erc/erc_balance.rs +++ b/crates/torii/graphql/src/object/erc/erc_balance.rs @@ -1,15 +1,27 @@ +use async_graphql::connection::PageInfo; use async_graphql::dynamic::{Field, FieldFuture, InputValue, TypeRef}; use async_graphql::{Name, Value}; use convert_case::{Case, Casing}; use serde::Deserialize; -use sqlx::{FromRow, Pool, Sqlite, SqliteConnection}; +use sqlx::sqlite::SqliteRow; +use sqlx::{FromRow, Pool, Row, Sqlite, SqliteConnection}; use starknet_crypto::Felt; use torii_core::sql::utils::felt_to_sql_string; use tracing::warn; -use crate::constants::{ERC_BALANCE_NAME, ERC_BALANCE_TYPE_NAME}; +use super::handle_cursor; +use crate::constants::{ + DEFAULT_LIMIT, ERC_BALANCE_NAME, ERC_BALANCE_TABLE, ERC_BALANCE_TYPE_NAME, ID_COLUMN, +}; use crate::mapping::ERC_BALANCE_TYPE_MAPPING; +use crate::object::connection::page_info::PageInfoObject; +use crate::object::connection::{ + connection_arguments, cursor, parse_connection_arguments, ConnectionArguments, +}; use crate::object::{BasicObject, ResolvableObject}; +use crate::query::data::count_rows; +use crate::query::filter::{Comparator, Filter, FilterValue}; +use crate::query::order::{CursorDirection, Direction}; use crate::types::{TypeMapping, ValueMapping}; use crate::utils::extract; @@ -38,20 +50,39 @@ impl ResolvableObject for ErcBalanceObject { TypeRef::named_nn(TypeRef::STRING), ); - let field = Field::new(self.name().0, TypeRef::named_list(self.type_name()), move |ctx| { - FieldFuture::new(async move { - let mut conn = ctx.data::>()?.acquire().await?; - let address = extract::( - ctx.args.as_index_map(), - &account_address.to_case(Case::Camel), - )?; + let mut field = Field::new( + self.name().0, + TypeRef::named(format!("{}Connection", self.type_name())), + move |ctx| { + FieldFuture::new(async move { + let mut conn = ctx.data::>()?.acquire().await?; + let connection = parse_connection_arguments(&ctx)?; + let address = extract::( + ctx.args.as_index_map(), + &account_address.to_case(Case::Camel), + )?; - let erc_balances = fetch_erc_balances(&mut conn, address).await?; + let filter = vec![Filter { + field: "account_address".to_string(), + comparator: Comparator::Eq, + value: FilterValue::String(felt_to_sql_string(&address)), + }]; - Ok(Some(Value::List(erc_balances))) - }) - }) + let total_count = + count_rows(&mut conn, ERC_BALANCE_TABLE, &None, &Some(filter)).await?; + + let (data, page_info) = + fetch_erc_balances(&mut conn, address, &connection, total_count).await?; + + let results = erc_balance_connection_output(&data, total_count, page_info)?; + + Ok(Some(Value::Object(results))) + }) + }, + ) .argument(argument); + + field = connection_arguments(field); vec![field] } } @@ -59,20 +90,132 @@ impl ResolvableObject for ErcBalanceObject { async fn fetch_erc_balances( conn: &mut SqliteConnection, address: Felt, -) -> sqlx::Result> { - let query = "SELECT t.contract_address, t.name, t.symbol, t.decimals, b.balance, b.token_id, \ - c.contract_type - FROM balances b + connection: &ConnectionArguments, + total_count: i64, +) -> sqlx::Result<(Vec, PageInfo)> { + let table_name = ERC_BALANCE_TABLE; + let id_column = format!("b.{}", ID_COLUMN); + + let mut query = format!( + "SELECT b.id, t.contract_address, t.name, t.symbol, t.decimals, b.balance, b.token_id, \ + c.contract_type + FROM {table_name} b JOIN tokens t ON b.token_id = t.id - JOIN contracts c ON t.contract_address = c.contract_address - WHERE b.account_address = ?"; + JOIN contracts c ON t.contract_address = c.contract_address" + ); + let mut conditions = vec!["b.account_address = ?".to_string()]; + + let mut cursor_param = &connection.after; + if let Some(after_cursor) = &connection.after { + conditions.push(handle_cursor(after_cursor, CursorDirection::After, ID_COLUMN)?); + } + + if let Some(before_cursor) = &connection.before { + cursor_param = &connection.before; + conditions.push(handle_cursor(before_cursor, CursorDirection::Before, ID_COLUMN)?); + } + + if !conditions.is_empty() { + query.push_str(&format!(" WHERE {}", conditions.join(" AND "))); + } + + let is_cursor_based = connection.first.or(connection.last).is_some() || cursor_param.is_some(); + + let data_limit = + connection.first.or(connection.last).or(connection.limit).unwrap_or(DEFAULT_LIMIT); + let limit = if is_cursor_based { + match &cursor_param { + Some(_) => data_limit + 2, + None => data_limit + 1, // prev page does not exist + } + } else { + data_limit + }; - let rows = sqlx::query(query).bind(felt_to_sql_string(&address)).fetch_all(conn).await?; + let order_direction = match (connection.first, connection.last) { + (Some(_), _) => Direction::Desc, + (_, Some(_)) => Direction::Asc, + _ => Direction::Desc, + }; - let mut erc_balances = Vec::new(); + query.push_str(&format!(" ORDER BY {id_column} {} LIMIT {limit}", order_direction.as_ref())); + + if let Some(offset) = connection.offset { + query.push_str(&format!(" OFFSET {}", offset)); + } + + let mut data = sqlx::query(&query).bind(felt_to_sql_string(&address)).fetch_all(conn).await?; + let mut page_info = PageInfo { + has_previous_page: false, + has_next_page: false, + start_cursor: None, + end_cursor: None, + }; + + if data.is_empty() { + Ok((data, page_info)) + } else if is_cursor_based { + match cursor_param { + Some(cursor_query) => { + let first_cursor = cursor::encode( + &data[0].try_get::(&id_column)?, + &data[0].try_get_unchecked::(&id_column)?, + ); + + if &first_cursor == cursor_query && data.len() != 1 { + data.remove(0); + page_info.has_previous_page = true; + } else { + data.pop(); + } + + if data.len() as u64 == limit - 1 { + page_info.has_next_page = true; + data.pop(); + } + } + None => { + if data.len() as u64 == limit { + page_info.has_next_page = true; + data.pop(); + } + } + } + + if !data.is_empty() { + page_info.start_cursor = Some(cursor::encode( + &data[0].try_get::(ID_COLUMN)?, + &data[0].try_get_unchecked::(ID_COLUMN)?, + )); + page_info.end_cursor = Some(cursor::encode( + &data[data.len() - 1].try_get::(ID_COLUMN)?, + &data[data.len() - 1].try_get_unchecked::(ID_COLUMN)?, + )); + } + + Ok((data, page_info)) + } else { + let offset = connection.offset.unwrap_or(0); + if 1 < offset && offset < total_count as u64 { + page_info.has_previous_page = true; + } + if limit + offset < total_count as u64 { + page_info.has_next_page = true; + } + + Ok((data, page_info)) + } +} - for row in rows { - let row = BalanceQueryResultRaw::from_row(&row)?; +fn erc_balance_connection_output( + data: &[SqliteRow], + total_count: i64, + page_info: PageInfo, +) -> sqlx::Result { + let mut edges = Vec::new(); + for row in data { + let row = BalanceQueryResultRaw::from_row(row)?; + let cursor = cursor::encode(&row.id, &row.id); let balance_value = match row.contract_type.to_lowercase().as_str() { "erc20" => { @@ -116,10 +259,17 @@ async fn fetch_erc_balances( } }; - erc_balances.push(balance_value); + edges.push(Value::Object(ValueMapping::from([ + (Name::new("node"), balance_value), + (Name::new("cursor"), Value::String(cursor)), + ]))); } - Ok(erc_balances) + Ok(ValueMapping::from([ + (Name::new("totalCount"), Value::from(total_count)), + (Name::new("edges"), Value::List(edges)), + (Name::new("pageInfo"), PageInfoObject::value(page_info)), + ])) } // TODO: This would be required when subscriptions are needed @@ -133,6 +283,7 @@ async fn fetch_erc_balances( #[derive(FromRow, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] struct BalanceQueryResultRaw { + pub id: String, pub contract_address: String, pub name: String, pub symbol: String, diff --git a/crates/torii/graphql/src/object/erc/erc_transfer.rs b/crates/torii/graphql/src/object/erc/erc_transfer.rs index ee522fef20..e1c950db39 100644 --- a/crates/torii/graphql/src/object/erc/erc_transfer.rs +++ b/crates/torii/graphql/src/object/erc/erc_transfer.rs @@ -1,16 +1,26 @@ +use async_graphql::connection::PageInfo; use async_graphql::dynamic::{Field, FieldFuture, InputValue, TypeRef}; use async_graphql::{Name, Value}; use convert_case::{Case, Casing}; use serde::Deserialize; -use sqlx::{FromRow, Pool, Sqlite, SqliteConnection}; +use sqlx::sqlite::SqliteRow; +use sqlx::{FromRow, Pool, Row, Sqlite, SqliteConnection}; use starknet_crypto::Felt; use torii_core::engine::get_transaction_hash_from_event_id; use torii_core::sql::utils::felt_to_sql_string; use tracing::warn; -use crate::constants::{ERC_TRANSFER_NAME, ERC_TRANSFER_TYPE_NAME}; +use super::handle_cursor; +use crate::constants::{ + DEFAULT_LIMIT, ERC_TRANSFER_NAME, ERC_TRANSFER_TABLE, ERC_TRANSFER_TYPE_NAME, ID_COLUMN, +}; use crate::mapping::ERC_TRANSFER_TYPE_MAPPING; +use crate::object::connection::page_info::PageInfoObject; +use crate::object::connection::{ + connection_arguments, cursor, parse_connection_arguments, ConnectionArguments, +}; use crate::object::{BasicObject, ResolvableObject}; +use crate::query::order::{CursorDirection, Direction}; use crate::types::{TypeMapping, ValueMapping}; use crate::utils::extract; @@ -34,31 +44,44 @@ impl BasicObject for ErcTransferObject { impl ResolvableObject for ErcTransferObject { fn resolvers(&self) -> Vec { let account_address = "account_address"; - let limit = "limit"; let arg_addr = InputValue::new( account_address.to_case(Case::Camel), TypeRef::named_nn(TypeRef::STRING), ); - let arg_limit = - InputValue::new(limit.to_case(Case::Camel), TypeRef::named_nn(TypeRef::INT)); - - let field = Field::new(self.name().0, TypeRef::named_list(self.type_name()), move |ctx| { - FieldFuture::new(async move { - let mut conn = ctx.data::>()?.acquire().await?; - let address = extract::( - ctx.args.as_index_map(), - &account_address.to_case(Case::Camel), - )?; - let limit = extract::(ctx.args.as_index_map(), &limit.to_case(Case::Camel))?; - let limit: u32 = limit.try_into()?; - - let erc_transfers = fetch_erc_transfers(&mut conn, address, limit).await?; - - Ok(Some(Value::List(erc_transfers))) - }) - }) - .argument(arg_addr) - .argument(arg_limit); + + let mut field = Field::new( + self.name().0, + TypeRef::named(format!("{}Connection", self.type_name())), + move |ctx| { + FieldFuture::new(async move { + let mut conn = ctx.data::>()?.acquire().await?; + let connection = parse_connection_arguments(&ctx)?; + let address = extract::( + ctx.args.as_index_map(), + &account_address.to_case(Case::Camel), + )?; + + let total_count: (i64,) = sqlx::query_as( + "SELECT COUNT(*) FROM erc_transfers WHERE from_address = ? OR to_address \ + = ?", + ) + .bind(felt_to_sql_string(&address)) + .bind(felt_to_sql_string(&address)) + .fetch_one(&mut *conn) + .await?; + let total_count = total_count.0; + + let (data, page_info) = + fetch_erc_transfers(&mut conn, address, &connection, total_count).await?; + let results = erc_transfer_connection_output(&data, total_count, page_info)?; + + Ok(Some(Value::Object(results))) + }) + }, + ) + .argument(arg_addr); + + field = connection_arguments(field); vec![field] } } @@ -66,9 +89,13 @@ impl ResolvableObject for ErcTransferObject { async fn fetch_erc_transfers( conn: &mut SqliteConnection, address: Felt, - limit: u32, -) -> sqlx::Result> { - let query = format!( + connection: &ConnectionArguments, + total_count: i64, +) -> sqlx::Result<(Vec, PageInfo)> { + let table_name = ERC_TRANSFER_TABLE; + let id_column = format!("et.{}", ID_COLUMN); + + let mut query = format!( r#" SELECT et.id, @@ -83,28 +110,134 @@ SELECT t.decimals, c.contract_type FROM - erc_transfers et + {table_name} et JOIN tokens t ON et.token_id = t.id JOIN contracts c ON t.contract_address = c.contract_address -WHERE - et.from_address = ? OR et.to_address = ? -ORDER BY - et.executed_at DESC -LIMIT {}; "#, - limit ); - let address = felt_to_sql_string(&address); - let rows = sqlx::query(&query).bind(&address).bind(&address).fetch_all(conn).await?; + let mut conditions = vec!["et.from_address = ? OR et.to_address = ?".to_string()]; + + let mut cursor_param = &connection.after; + if let Some(after_cursor) = &connection.after { + conditions.push(handle_cursor(after_cursor, CursorDirection::After, ID_COLUMN)?); + } + + if let Some(before_cursor) = &connection.before { + cursor_param = &connection.before; + conditions.push(handle_cursor(before_cursor, CursorDirection::Before, ID_COLUMN)?); + } + + if !conditions.is_empty() { + query.push_str(&format!(" WHERE {}", conditions.join(" AND "))); + } + + let is_cursor_based = connection.first.or(connection.last).is_some() || cursor_param.is_some(); + + let data_limit = + connection.first.or(connection.last).or(connection.limit).unwrap_or(DEFAULT_LIMIT); + let limit = if is_cursor_based { + match &cursor_param { + Some(_) => data_limit + 2, + None => data_limit + 1, // prev page does not exist + } + } else { + data_limit + }; + + let order_direction = match (connection.first, connection.last) { + (Some(_), _) => Direction::Desc, + (_, Some(_)) => Direction::Asc, + _ => Direction::Desc, + }; + + query.push_str(&format!(" ORDER BY {id_column} {} LIMIT {limit}", order_direction.as_ref())); + + if let Some(offset) = connection.offset { + query.push_str(&format!(" OFFSET {}", offset)); + } + + let mut data = sqlx::query(&query) + .bind(felt_to_sql_string(&address)) + .bind(felt_to_sql_string(&address)) + .fetch_all(conn) + .await?; + + let mut page_info = PageInfo { + has_previous_page: false, + has_next_page: false, + start_cursor: None, + end_cursor: None, + }; + + if data.is_empty() { + Ok((data, page_info)) + } else if is_cursor_based { + match cursor_param { + Some(cursor_query) => { + let first_cursor = cursor::encode( + &data[0].try_get::(ID_COLUMN)?, + &data[0].try_get_unchecked::(ID_COLUMN)?, + ); + + if &first_cursor == cursor_query && data.len() != 1 { + data.remove(0); + page_info.has_previous_page = true; + } else { + data.pop(); + } + + if data.len() as u64 == limit - 1 { + page_info.has_next_page = true; + data.pop(); + } + } + None => { + if data.len() as u64 == limit { + page_info.has_next_page = true; + data.pop(); + } + } + } + + if !data.is_empty() { + page_info.start_cursor = Some(cursor::encode( + &data[0].try_get::(ID_COLUMN)?, + &data[0].try_get_unchecked::(ID_COLUMN)?, + )); + page_info.end_cursor = Some(cursor::encode( + &data[data.len() - 1].try_get::(ID_COLUMN)?, + &data[data.len() - 1].try_get_unchecked::(ID_COLUMN)?, + )); + } + + Ok((data, page_info)) + } else { + let offset = connection.offset.unwrap_or(0); + if 1 < offset && offset < total_count as u64 { + page_info.has_previous_page = true; + } + if limit + offset < total_count as u64 { + page_info.has_next_page = true; + } + + Ok((data, page_info)) + } +} - let mut erc_balances = Vec::new(); +fn erc_transfer_connection_output( + data: &[SqliteRow], + total_count: i64, + page_info: PageInfo, +) -> sqlx::Result { + let mut edges = Vec::new(); - for row in rows { - let row = TransferQueryResultRaw::from_row(&row)?; + for row in data { + let row = TransferQueryResultRaw::from_row(row)?; let transaction_hash = get_transaction_hash_from_event_id(&row.id); + let cursor = cursor::encode(&row.id, &row.id); let transfer_value = match row.contract_type.to_lowercase().as_str() { "erc20" => { @@ -156,10 +289,17 @@ LIMIT {}; } }; - erc_balances.push(transfer_value); + edges.push(Value::Object(ValueMapping::from([ + (Name::new("node"), transfer_value), + (Name::new("cursor"), Value::String(cursor)), + ]))); } - Ok(erc_balances) + Ok(ValueMapping::from([ + (Name::new("totalCount"), Value::from(total_count)), + (Name::new("edges"), Value::List(edges)), + (Name::new("pageInfo"), PageInfoObject::value(page_info)), + ])) } // TODO: This would be required when subscriptions are needed diff --git a/crates/torii/graphql/src/object/erc/mod.rs b/crates/torii/graphql/src/object/erc/mod.rs index eac2c5510b..3e85722cca 100644 --- a/crates/torii/graphql/src/object/erc/mod.rs +++ b/crates/torii/graphql/src/object/erc/mod.rs @@ -1,3 +1,17 @@ +use super::connection::cursor; +use crate::query::order::CursorDirection; + pub mod erc_balance; pub mod erc_token; pub mod erc_transfer; + +fn handle_cursor( + cursor: &str, + direction: CursorDirection, + id_column: &str, +) -> sqlx::Result { + match cursor::decode(cursor) { + Ok((event_id, _)) => Ok(format!("{} {} '{}'", id_column, direction.as_ref(), event_id)), + Err(_) => Err(sqlx::Error::Decode("Invalid cursor format".into())), + } +}