Skip to content

Commit

Permalink
feat(queries)!: Query projections (#5242)
Browse files Browse the repository at this point in the history
* refactor(queries): separate projections from predicates
* refactor(queries): wrap query outputs into batch tuples
* refactor(queries): transmit a selector tuple along with a query
* refactor(queries): implement selector evaluation; actually evaluate the selector in core
* refactor(queries): add query projection API to the builder
* refactor(queries): update documentation for the query dsl
* fix(queries): make pytests pass
* fix(queries): fix documentation typos
* feat(queries,cli): print `iroha json query` results row-wise


---------

Signed-off-by: ⭐️NINIKA⭐️ <[email protected]>
  • Loading branch information
DCNick3 authored Nov 27, 2024
1 parent 7cb55e4 commit 485ef91
Show file tree
Hide file tree
Showing 66 changed files with 3,225 additions and 2,982 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ async-trait = "0.1.81"
strum = { version = "0.25.0", default-features = false }
getset = "0.1.2"
hex-literal = "0.4.1"
derive-where = "1.2.7"

rand = { version = "0.8.5", default-features = false, features = ["getrandom", "alloc"] }
axum = { version = "0.7.5", default-features = false }
Expand Down
15 changes: 6 additions & 9 deletions crates/iroha/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{collections::HashMap, fmt::Debug};

use eyre::{eyre, Context, Result};
use http::StatusCode;
use iroha_data_model::query::QueryOutputBatchBoxTuple;
use iroha_torii_const::uri as torii_uri;
use parity_scale_codec::{DecodeAll, Encode};
use url::Url;
Expand All @@ -16,9 +17,8 @@ use crate::{
query::{
builder::{QueryBuilder, QueryExecutor},
parameters::ForwardCursor,
predicate::HasPredicateBox,
Query, QueryOutput, QueryOutputBatchBox, QueryRequest, QueryResponse, QueryWithParams,
SingularQuery, SingularQueryBox, SingularQueryOutputBox,
Query, QueryOutput, QueryRequest, QueryResponse, QueryWithParams, SingularQuery,
SingularQueryBox, SingularQueryOutputBox,
},
ValidationFail,
},
Expand Down Expand Up @@ -158,7 +158,7 @@ impl QueryExecutor for Client {
fn start_query(
&self,
query: QueryWithParams,
) -> Result<(QueryOutputBatchBox, u64, Option<Self::Cursor>), Self::Error> {
) -> Result<(QueryOutputBatchBoxTuple, u64, Option<Self::Cursor>), Self::Error> {
let request_head = self.get_query_request_head();

let request = QueryRequest::Start(query);
Expand All @@ -178,7 +178,7 @@ impl QueryExecutor for Client {

fn continue_query(
cursor: Self::Cursor,
) -> Result<(QueryOutputBatchBox, u64, Option<Self::Cursor>), Self::Error> {
) -> Result<(QueryOutputBatchBoxTuple, u64, Option<Self::Cursor>), Self::Error> {
let QueryCursor {
request_head,
cursor,
Expand Down Expand Up @@ -235,10 +235,7 @@ impl Client {
}

/// Build an iterable query and return a builder object
pub fn query<Q>(
&self,
query: Q,
) -> QueryBuilder<Self, Q, <<Q as Query>::Item as HasPredicateBox>::PredicateBoxType>
pub fn query<Q>(&self, query: Q) -> QueryBuilder<Self, Q, Q::Item>
where
Q: Query,
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ async fn multiple_blocks_created() -> Result<()> {
client
.query(FindAssets::new())
.filter_with(|asset| {
asset.id.account.eq(account_id) & asset.id.definition_id.eq(definition)
asset.id.account.eq(account_id) & asset.id.definition.eq(definition)
})
.execute_all()
})
Expand Down
8 changes: 7 additions & 1 deletion crates/iroha/tests/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use iroha::{
client::Client,
data_model::{asset::AssetDefinition, prelude::*},
};
use iroha_data_model::query::dsl::SelectorTuple;
use iroha_test_network::*;
use nonzero_ext::nonzero;

Expand Down Expand Up @@ -60,7 +61,12 @@ fn fetch_size_should_work() -> Result<()> {
register_assets(&client)?;

let query = QueryWithParams::new(
QueryWithFilter::new(FindAssetsDefinitions::new(), CompoundPredicate::PASS).into(),
QueryWithFilter::new(
FindAssetsDefinitions::new(),
CompoundPredicate::PASS,
SelectorTuple::default(),
)
.into(),
QueryParams::new(
Pagination::new(Some(nonzero!(7_u64)), 1),
Sorting::default(),
Expand Down
11 changes: 4 additions & 7 deletions crates/iroha/tests/sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ use eyre::{Result, WrapErr as _};
use iroha::{
client::QueryResult,
crypto::KeyPair,
data_model::{
account::Account, name::Name, prelude::*,
query::predicate::predicate_atoms::asset::AssetPredicateBox,
},
data_model::{account::Account, name::Name, prelude::*},
};
use iroha_test_network::*;
use iroha_test_samples::ALICE_ID;
Expand All @@ -24,7 +21,7 @@ fn correct_pagination_assets_after_creating_new_one() {
let missing_indices = vec![N_ASSETS / 2];
let pagination = Pagination::new(Some(nonzero!(N_ASSETS as u64 / 3)), N_ASSETS as u64 / 3);
let xor_filter =
AssetPredicateBox::build(|asset| asset.id.definition_id.name.starts_with("xor"));
CompoundPredicate::<Asset>::build(|asset| asset.id.definition.name.starts_with("xor"));

let sort_by_metadata_key = "sort".parse::<Name>().expect("Valid");
let sorting = Sorting::by_metadata_key(sort_by_metadata_key.clone());
Expand Down Expand Up @@ -201,7 +198,7 @@ fn correct_sorting_of_entities() {
let res = test_client
.query(FindAccounts::new())
.with_sorting(Sorting::by_metadata_key(sort_by_metadata_key.clone()))
.filter_with(|account| account.id.domain_id.eq(domain_id))
.filter_with(|account| account.id.domain.eq(domain_id))
.execute_all()
.expect("Valid");

Expand Down Expand Up @@ -339,7 +336,7 @@ fn sort_only_elements_which_have_sorting_key() -> Result<()> {
let res = test_client
.query(FindAccounts::new())
.with_sorting(Sorting::by_metadata_key(sort_by_metadata_key))
.filter_with(|account| account.id.domain_id.eq(domain_id))
.filter_with(|account| account.id.domain.eq(domain_id))
.execute_all()
.wrap_err("Failed to submit request")?;

Expand Down
51 changes: 34 additions & 17 deletions crates/iroha_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,7 @@ fn submit(
}

mod filter {
use iroha::data_model::query::predicate::{
predicate_atoms::{
account::AccountPredicateBox, asset::AssetPredicateBox, domain::DomainPredicateBox,
},
CompoundPredicate,
};
use iroha::data_model::query::dsl::CompoundPredicate;
use serde::Deserialize;

use super::*;
Expand All @@ -265,32 +260,32 @@ mod filter {
#[derive(Clone, Debug, clap::Parser)]
pub struct DomainFilter {
/// Predicate for filtering given as JSON5 string
#[clap(value_parser = parse_json5::<CompoundPredicate<DomainPredicateBox>>)]
pub predicate: CompoundPredicate<DomainPredicateBox>,
#[clap(value_parser = parse_json5::<CompoundPredicate<Domain>>)]
pub predicate: CompoundPredicate<Domain>,
}

/// Filter for account queries
#[derive(Clone, Debug, clap::Parser)]
pub struct AccountFilter {
/// Predicate for filtering given as JSON5 string
#[clap(value_parser = parse_json5::<CompoundPredicate<AccountPredicateBox>>)]
pub predicate: CompoundPredicate<AccountPredicateBox>,
#[clap(value_parser = parse_json5::<CompoundPredicate<Account>>)]
pub predicate: CompoundPredicate<Account>,
}

/// Filter for asset queries
#[derive(Clone, Debug, clap::Parser)]
pub struct AssetFilter {
/// Predicate for filtering given as JSON5 string
#[clap(value_parser = parse_json5::<CompoundPredicate<AssetPredicateBox>>)]
pub predicate: CompoundPredicate<AssetPredicateBox>,
#[clap(value_parser = parse_json5::<CompoundPredicate<Asset>>)]
pub predicate: CompoundPredicate<Asset>,
}

/// Filter for asset definition queries
#[derive(Clone, Debug, clap::Parser)]
pub struct AssetDefinitionFilter {
/// Predicate for filtering given as JSON5 string
#[clap(value_parser = parse_json5::<CompoundPredicate<AssetDefinitionPredicateBox>>)]
pub predicate: CompoundPredicate<AssetDefinitionPredicateBox>,
#[clap(value_parser = parse_json5::<CompoundPredicate<AssetDefinition>>)]
pub predicate: CompoundPredicate<AssetDefinition>,
}

fn parse_json5<T>(s: &str) -> Result<T, String>
Expand Down Expand Up @@ -1219,18 +1214,40 @@ mod json {
// we can't really do type-erased iterable queries in a nice way right now...
use iroha::data_model::query::builder::QueryExecutor;

let (mut first_batch, _remaining_items, mut continue_cursor) =
let (mut accumulated_batch, _remaining_items, mut continue_cursor) =
client.start_query(query)?;

while let Some(cursor) = continue_cursor {
let (next_batch, _remaining_items, next_continue_cursor) =
<Client as QueryExecutor>::continue_query(cursor)?;

first_batch.extend(next_batch);
accumulated_batch.extend(next_batch);
continue_cursor = next_continue_cursor;
}

context.print_data(&first_batch)?;
// for efficiency reasons iroha encodes query results in a columnar format,
// so we need to transpose the batch to get the format that is more natural for humans
let mut batches = vec![Vec::new(); accumulated_batch.len()];
for batch in accumulated_batch.into_iter() {
// downcast to json and extract the actual array
// dynamic typing is just easier to use here than introducing a bunch of new types only for iroha_cli
let batch = serde_json::to_value(batch)?;
let serde_json::Value::Object(batch) = batch else {
panic!("Expected the batch serialization to be a JSON object");
};
let (_ty, batch) = batch
.into_iter()
.next()
.expect("Expected the batch to have exactly one key");
let serde_json::Value::Array(batch_vec) = batch else {
panic!("Expected the batch payload to be a JSON array");
};
for (target, value) in batches.iter_mut().zip(batch_vec) {
target.push(value);
}
}

context.print_data(&batches)?;
}
}

Expand Down
69 changes: 57 additions & 12 deletions crates/iroha_core/src/query/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
use std::{fmt::Debug, num::NonZeroU64};

use iroha_data_model::query::QueryOutputBatchBox;
use iroha_data_model::{
prelude::SelectorTuple,
query::{
dsl::{EvaluateSelector, HasProjection, SelectorMarker},
QueryOutputBatchBox, QueryOutputBatchBoxTuple,
},
};
use parity_scale_codec::{Decode, Encode};
use serde::{Deserialize, Serialize};

Expand All @@ -25,29 +31,62 @@ pub enum Error {
Done,
}

fn evaluate_selector_tuple<T>(
batch: Vec<T>,
selector: &SelectorTuple<T>,
) -> QueryOutputBatchBoxTuple
where
T: HasProjection<SelectorMarker, AtomType = ()> + 'static,
T::Projection: EvaluateSelector<T>,
{
let mut batch_tuple = Vec::new();

let mut iter = selector.iter().peekable();

while let Some(item) = iter.next() {
if iter.peek().is_none() {
// do not clone the last item
batch_tuple.push(item.project(batch.into_iter()));
return QueryOutputBatchBoxTuple { tuple: batch_tuple };
}

batch_tuple.push(item.project_clone(batch.iter()));
}

// this should only happen for empty selectors
QueryOutputBatchBoxTuple { tuple: batch_tuple }
}

trait BatchedTrait {
fn next_batch(
&mut self,
cursor: u64,
) -> Result<(QueryOutputBatchBox, Option<NonZeroU64>), Error>;
) -> Result<(QueryOutputBatchBoxTuple, Option<NonZeroU64>), Error>;
fn remaining(&self) -> u64;
}

struct BatchedInner<I> {
struct BatchedInner<I>
where
I: ExactSizeIterator,
I::Item: HasProjection<SelectorMarker, AtomType = ()>,
{
iter: I,
selector: SelectorTuple<I::Item>,
batch_size: NonZeroU64,
cursor: Option<u64>,
}

impl<I> BatchedTrait for BatchedInner<I>
where
I: ExactSizeIterator,
I::Item: HasProjection<SelectorMarker, AtomType = ()> + 'static,
<I::Item as HasProjection<SelectorMarker>>::Projection: EvaluateSelector<I::Item>,
QueryOutputBatchBox: From<Vec<I::Item>>,
{
fn next_batch(
&mut self,
cursor: u64,
) -> Result<(QueryOutputBatchBox, Option<NonZeroU64>), Error> {
) -> Result<(QueryOutputBatchBoxTuple, Option<NonZeroU64>), Error> {
let Some(server_cursor) = self.cursor else {
// the server is done with the iterator
return Err(Error::Done);
Expand Down Expand Up @@ -76,7 +115,9 @@ where
.expect("`u32` should always fit into `usize`"),
)
.collect();
let batch = batch.into();

// evaluate the requested projections
let batch = evaluate_selector_tuple(batch, &self.selector);

// did we get enough elements to continue?
if current_batch_size >= expected_batch_size {
Expand All @@ -101,27 +142,31 @@ where
}
}

/// A query output iterator that combines batching and type erasure.
pub struct QueryBatchedErasedIterator {
/// A query output iterator that combines evaluating selectors, batching and type erasure.
pub struct ErasedQueryIterator {
inner: Box<dyn BatchedTrait + Send + Sync>,
}

impl Debug for QueryBatchedErasedIterator {
impl Debug for ErasedQueryIterator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QueryBatchedErasedIterator").finish()
}
}

impl QueryBatchedErasedIterator {
/// Creates a new batched iterator. Boxes the inner iterator to erase its type.
pub fn new<I>(iter: I, batch_size: NonZeroU64) -> Self
impl ErasedQueryIterator {
/// Creates a new erased query iterator. Boxes the inner iterator to erase its type.
pub fn new<I>(iter: I, selector: SelectorTuple<I::Item>, batch_size: NonZeroU64) -> Self
where
I: ExactSizeIterator + Send + Sync + 'static,
I::Item: HasProjection<SelectorMarker, AtomType = ()> + 'static,
<I::Item as HasProjection<SelectorMarker>>::Projection:
EvaluateSelector<I::Item> + Send + Sync,
QueryOutputBatchBox: From<Vec<I::Item>>,
{
Self {
inner: Box::new(BatchedInner {
iter,
selector,
batch_size,
cursor: Some(0),
}),
Expand All @@ -141,7 +186,7 @@ impl QueryBatchedErasedIterator {
pub fn next_batch(
&mut self,
cursor: u64,
) -> Result<(QueryOutputBatchBox, Option<NonZeroU64>), Error> {
) -> Result<(QueryOutputBatchBoxTuple, Option<NonZeroU64>), Error> {
self.inner.next_batch(cursor)
}

Expand Down
Loading

0 comments on commit 485ef91

Please sign in to comment.