Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce new deserialization API #1057

Merged
merged 25 commits into from
Nov 12, 2024
Merged
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
51c1005
iterator: fix QueryPager docstring
wprzytula Nov 7, 2024
e67e5da
query_result: fix QueryRowsResult's docstrings
wprzytula Nov 10, 2024
463b9b5
treewide: rename Session to LegacySession
piodul Mar 15, 2023
ed57255
session: make generic and introduce "session kind" parameter
piodul Mar 16, 2023
8e36957
session: move query-related methods to a separate block
piodul Mar 16, 2023
6daf833
session: re-introduce the Session type as an alias
piodul Mar 16, 2023
c1416dd
session_builder: rename build->build_legacy and then reintroduce
piodul Mar 16, 2023
e9d4719
tests: scylla_supports_tablets[_legacy] suffix
wprzytula Aug 8, 2024
37ff7c6
session: partly de-genericise internal query/exec functions
piodul Mar 13, 2023
c229ae5
session: return new QueryResult from internal methods
piodul Mar 13, 2023
22f28cd
session: add interface methods for the new deser API
piodul Mar 13, 2023
2ec2885
connection: switch to the new deserialization framework
piodul Mar 14, 2023
b3f4a04
caching_session: make generic over session APIs
piodul Mar 14, 2023
001b5bb
caching_session: fix docstring references
wprzytula Nov 6, 2024
db6bee0
caching_session: modernize tests
piodul Mar 14, 2023
6d9d971
connection: migrate query_iter to new deserialization framework
wprzytula Mar 12, 2024
f3aae01
topology: reduce `query_filter_keyspace_name` monomorphisation penalty
wprzytula Nov 7, 2024
2b5f386
{session,tracing}: switch to the new deser framework for tracing info
piodul Mar 17, 2023
5919cf9
treewide: switch tests to use the new framework
wprzytula Mar 12, 2024
9a092f9
examples: adjust to use the new interface
piodul Mar 14, 2023
a204a7b
codewide: migrate doctests to new deser API
wprzytula Aug 14, 2024
e99b875
session_test: regression test empty collections deserialization
wprzytula May 21, 2024
f1e7e02
codewide: introduce DeserializeOwned{Row,Value}
wprzytula Nov 10, 2024
98b382d
iterator: rename RowIteratorWorker to PagerWorker
wprzytula Nov 12, 2024
d4a222c
iterator: fix QueryPager::rows_stream() lifetime constraints
wprzytula Nov 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
connection: migrate query_iter to new deserialization framework
The Connection::query_iter method is changed to use the new
deserialization framework. All the internal uses of it in topology.rs
are adjusted.

Co-authored-by: Piotr Dulikowski <piodul@scylladb.com>
wprzytula and piodul committed Nov 12, 2024
commit 6d9d9712a3dfeae5bf303550d1c5e630b718d5bd
15 changes: 9 additions & 6 deletions scylla/src/transport/connection.rs
wprzytula marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -47,7 +47,7 @@ use std::{
};

use super::errors::{ProtocolError, SchemaVersionFetchError, UseKeyspaceProtocolError};
use super::iterator::{LegacyRowIterator, QueryPager};
use super::iterator::QueryPager;
use super::locator::tablets::{RawTablet, TabletParsingError};
use super::query_result::QueryResult;
use super::session::AddressTranslator;
@@ -1182,15 +1182,14 @@ impl Connection {
pub(crate) async fn query_iter(
self: Arc<Self>,
query: Query,
) -> Result<LegacyRowIterator, QueryError> {
) -> Result<QueryPager, QueryError> {
let consistency = query
.config
.determine_consistency(self.config.default_consistency);
let serial_consistency = query.config.serial_consistency.flatten();

QueryPager::new_for_connection_query_iter(query, self, consistency, serial_consistency)
.await
.map(QueryPager::into_legacy)
}

/// Executes a prepared statements and fetches its results over multiple pages, using
@@ -1199,7 +1198,7 @@ impl Connection {
self: Arc<Self>,
prepared_statement: PreparedStatement,
values: SerializedValues,
) -> Result<LegacyRowIterator, QueryError> {
) -> Result<QueryPager, QueryError> {
let consistency = prepared_statement
.config
.determine_consistency(self.config.default_consistency);
@@ -1213,7 +1212,6 @@ impl Connection {
serial_consistency,
)
.await
.map(QueryPager::into_legacy)
}

#[allow(dead_code)]
@@ -2479,6 +2477,8 @@ mod tests {
.query_iter(select_query.clone())
.await
.unwrap()
.rows_stream::<(i32,)>()
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
@@ -2503,7 +2503,8 @@ mod tests {
.query_iter(select_query.clone())
.await
.unwrap()
.into_typed::<(i32,)>()
.rows_stream::<(i32,)>()
.unwrap()
.map(|ret| ret.unwrap().0)
.collect::<Vec<_>>()
.await;
@@ -2517,6 +2518,8 @@ mod tests {
))
.await
.unwrap()
.rows_stream::<()>()
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
12 changes: 6 additions & 6 deletions scylla/src/transport/errors.rs
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@ use scylla_cql::{
response::CqlResponseKind,
value::SerializeValuesError,
},
types::serialize::SerializationError,
types::{deserialize::TypeCheckError, serialize::SerializationError},
};

use thiserror::Error;
@@ -436,7 +436,7 @@ pub enum PeersMetadataError {
pub enum KeyspacesMetadataError {
/// system_schema.keyspaces has invalid column type.
#[error("system_schema.keyspaces has invalid column type: {0}")]
SchemaKeyspacesInvalidColumnType(FromRowError),
SchemaKeyspacesInvalidColumnType(TypeCheckError),

/// Bad keyspace replication strategy.
#[error("Bad keyspace <{keyspace}> replication strategy: {error}")]
@@ -474,7 +474,7 @@ pub enum KeyspaceStrategyError {
pub enum UdtMetadataError {
/// system_schema.types has invalid column type.
#[error("system_schema.types has invalid column type: {0}")]
SchemaTypesInvalidColumnType(FromRowError),
SchemaTypesInvalidColumnType(TypeCheckError),

/// Circular UDT dependency detected.
#[error("Detected circular dependency between user defined types - toposort is impossible!")]
@@ -487,11 +487,11 @@ pub enum UdtMetadataError {
pub enum TablesMetadataError {
/// system_schema.tables has invalid column type.
#[error("system_schema.tables has invalid column type: {0}")]
SchemaTablesInvalidColumnType(FromRowError),
SchemaTablesInvalidColumnType(TypeCheckError),

/// system_schema.columns has invalid column type.
#[error("system_schema.columns has invalid column type: {0}")]
SchemaColumnsInvalidColumnType(FromRowError),
SchemaColumnsInvalidColumnType(TypeCheckError),

/// Unknown column kind.
#[error("Unknown column kind '{column_kind}' for {keyspace_name}.{table_name}.{column_name}")]
@@ -509,7 +509,7 @@ pub enum TablesMetadataError {
pub enum ViewsMetadataError {
/// system_schema.views has invalid column type.
#[error("system_schema.views has invalid column type: {0}")]
SchemaViewsInvalidColumnType(FromRowError),
SchemaViewsInvalidColumnType(TypeCheckError),
}

/// Error caused by caller creating an invalid query
128 changes: 71 additions & 57 deletions scylla/src/transport/topology.rs
Original file line number Diff line number Diff line change
@@ -13,8 +13,10 @@ use futures::stream::{self, StreamExt, TryStreamExt};
use futures::Stream;
use rand::seq::SliceRandom;
use rand::{thread_rng, Rng};
use scylla_cql::frame::response::result::Row;
use scylla_macros::FromRow;
use scylla_cql::frame::frame_errors::RowsParseError;
use scylla_cql::types::deserialize::row::DeserializeRow;
use scylla_cql::types::deserialize::TypeCheckError;
use scylla_macros::DeserializeRow;
use std::borrow::BorrowMut;
use std::cell::Cell;
use std::collections::HashMap;
@@ -765,11 +767,13 @@ async fn query_metadata(
Ok(Metadata { peers, keyspaces })
}

#[derive(FromRow)]
#[scylla_crate = "scylla_cql"]
#[derive(DeserializeRow)]
#[scylla(crate = "scylla_cql")]
struct NodeInfoRow {
host_id: Option<Uuid>,
#[scylla(rename = "rpc_address")]
untranslated_ip_addr: IpAddr,
#[scylla(rename = "data_center")]
datacenter: Option<String>,
rack: Option<String>,
tokens: Option<Vec<String>>,
@@ -799,6 +803,13 @@ async fn query_peers(conn: &Arc<Connection>, connect_port: u16) -> Result<Vec<Pe
let peers_query_stream = conn
.clone()
.query_iter(peers_query)
.map(|pager_res| {
let pager = pager_res?;
let rows_stream = pager
.rows_stream::<NodeInfoRow>()
.map_err(RowsParseError::from)?;
Ok::<_, QueryError>(rows_stream)
})
.into_stream()
.try_flatten()
.and_then(|row_result| future::ok((NodeInfoSource::Peer, row_result)));
@@ -809,6 +820,13 @@ async fn query_peers(conn: &Arc<Connection>, connect_port: u16) -> Result<Vec<Pe
let local_query_stream = conn
.clone()
.query_iter(local_query)
.map(|pager_res| {
let pager = pager_res?;
let rows_stream = pager
.rows_stream::<NodeInfoRow>()
.map_err(RowsParseError::from)?;
Ok::<_, QueryError>(rows_stream)
})
.into_stream()
.try_flatten()
.and_then(|row_result| future::ok((NodeInfoSource::Local, row_result)));
@@ -819,9 +837,8 @@ async fn query_peers(conn: &Arc<Connection>, connect_port: u16) -> Result<Vec<Pe
let local_address = SocketAddr::new(local_ip, connect_port);

let translated_peers_futures = untranslated_rows.map(|row_result| async {
let (source, raw_row) = row_result?;
match raw_row.into_typed() {
Ok(row) => create_peer_from_row(source, row, local_address).await,
match row_result {
Ok((source, row)) => create_peer_from_row(source, row, local_address).await,
Err(err) => {
warn!(
"system.peers or system.local has an invalid row, skipping it: {}",
@@ -905,15 +922,19 @@ async fn create_peer_from_row(
}))
}

fn query_filter_keyspace_name<'a>(
fn query_filter_keyspace_name<'a, R>(
conn: &Arc<Connection>,
query_str: &'a str,
keyspaces_to_fetch: &'a [String],
) -> impl Stream<Item = Result<Row, QueryError>> + 'a {
convert_typecheck_error: impl FnOnce(TypeCheckError) -> MetadataError + 'a,
) -> impl Stream<Item = Result<R, QueryError>> + 'a
where
R: for<'r> DeserializeRow<'r, 'r> + 'static,
{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This R bound basically means "deserializes to an owned type", right?
Maybe we could have a trait for that with supertrait for<'r> DeserializeRow<'r, 'r> + 'static?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of this idea. For me, the fact that a type is 'static is completely independent of the DeserializeRow implementation, and we shouldn't add another name to mean both of those things. Mentioning this in a straightforward way is the clearest.

wprzytula marked this conversation as resolved.
Show resolved Hide resolved
let conn = conn.clone();

let fut = async move {
if keyspaces_to_fetch.is_empty() {
let pager = if keyspaces_to_fetch.is_empty() {
let mut query = Query::new(query_str);
query.set_page_size(METADATA_QUERY_PAGE_SIZE);

@@ -928,7 +949,11 @@ fn query_filter_keyspace_name<'a>(
let prepared = conn.prepare(&query).await?;
let serialized_values = prepared.serialize_values(&keyspaces)?;
conn.execute_iter(prepared, serialized_values).await
}
}?;

let stream: super::iterator::TypedRowStream<R> =
pager.rows_stream::<R>().map_err(convert_typecheck_error)?;
Ok::<_, QueryError>(stream)
};
fut.into_stream().try_flatten()
}
@@ -938,10 +963,15 @@ async fn query_keyspaces(
keyspaces_to_fetch: &[String],
fetch_schema: bool,
) -> Result<HashMap<String, Keyspace>, QueryError> {
let rows = query_filter_keyspace_name(
let rows = query_filter_keyspace_name::<(String, HashMap<String, String>)>(
conn,
"select keyspace_name, replication from system_schema.keyspaces",
keyspaces_to_fetch,
|err| {
MetadataError::Keyspaces(KeyspacesMetadataError::SchemaKeyspacesInvalidColumnType(
err,
))
},
);

let (mut all_tables, mut all_views, mut all_user_defined_types) = if fetch_schema {
@@ -956,12 +986,7 @@ async fn query_keyspaces(
};

rows.map(|row_result| {
let row = row_result?;
let (keyspace_name, strategy_map) = row.into_typed::<(String, _)>().map_err(|err| {
MetadataError::Keyspaces(KeyspacesMetadataError::SchemaKeyspacesInvalidColumnType(
err,
))
})?;
let (keyspace_name, strategy_map) = row_result?;

let strategy: Strategy = strategy_from_string_map(strategy_map).map_err(|error| {
MetadataError::Keyspaces(KeyspacesMetadataError::Strategy {
@@ -988,8 +1013,8 @@ async fn query_keyspaces(
.await
}

#[derive(FromRow, Debug)]
#[scylla_crate = "crate"]
#[derive(DeserializeRow, Debug)]
#[scylla(crate = "crate")]
struct UdtRow {
keyspace_name: String,
type_name: String,
@@ -1031,21 +1056,16 @@ async fn query_user_defined_types(
conn: &Arc<Connection>,
keyspaces_to_fetch: &[String],
) -> Result<HashMap<String, HashMap<String, Arc<UserDefinedType>>>, QueryError> {
let rows = query_filter_keyspace_name(
let rows = query_filter_keyspace_name::<UdtRow>(
conn,
"select keyspace_name, type_name, field_names, field_types from system_schema.types",
keyspaces_to_fetch,
|err| MetadataError::Udts(UdtMetadataError::SchemaTypesInvalidColumnType(err)),
);

let mut udt_rows: Vec<UdtRowWithParsedFieldTypes> = rows
.map(|row_result| {
let row = row_result?;
let udt_row = row
.into_typed::<UdtRow>()
.map_err(|err| {
MetadataError::Udts(UdtMetadataError::SchemaTypesInvalidColumnType(err))
})?
.try_into()?;
let udt_row = row_result?.try_into()?;

Ok::<_, QueryError>(udt_row)
})
@@ -1355,21 +1375,17 @@ async fn query_tables(
keyspaces_to_fetch: &[String],
udts: &HashMap<String, HashMap<String, Arc<UserDefinedType>>>,
) -> Result<HashMap<String, HashMap<String, Table>>, QueryError> {
let rows = query_filter_keyspace_name(
let rows = query_filter_keyspace_name::<(String, String)>(
conn,
"SELECT keyspace_name, table_name FROM system_schema.tables",
keyspaces_to_fetch,
|err| MetadataError::Tables(TablesMetadataError::SchemaTablesInvalidColumnType(err)),
);
let mut result = HashMap::new();
let mut tables = query_tables_schema(conn, keyspaces_to_fetch, udts).await?;

rows.map(|row_result| {
let row = row_result?;
let (keyspace_name, table_name) = row.into_typed().map_err(|err| {
MetadataError::Tables(TablesMetadataError::SchemaTablesInvalidColumnType(err))
})?;

let keyspace_and_table_name = (keyspace_name, table_name);
let keyspace_and_table_name = row_result?;

let table = tables.remove(&keyspace_and_table_name).unwrap_or(Table {
columns: HashMap::new(),
@@ -1396,20 +1412,18 @@ async fn query_views(
keyspaces_to_fetch: &[String],
udts: &HashMap<String, HashMap<String, Arc<UserDefinedType>>>,
) -> Result<HashMap<String, HashMap<String, MaterializedView>>, QueryError> {
let rows = query_filter_keyspace_name(
let rows = query_filter_keyspace_name::<(String, String, String)>(
conn,
"SELECT keyspace_name, view_name, base_table_name FROM system_schema.views",
keyspaces_to_fetch,
|err| MetadataError::Views(ViewsMetadataError::SchemaViewsInvalidColumnType(err)),
);

let mut result = HashMap::new();
let mut tables = query_tables_schema(conn, keyspaces_to_fetch, udts).await?;

rows.map(|row_result| {
let row = row_result?;
let (keyspace_name, view_name, base_table_name) = row.into_typed().map_err(|err| {
MetadataError::Views(ViewsMetadataError::SchemaViewsInvalidColumnType(err))
})?;
let (keyspace_name, view_name, base_table_name) = row_result?;

let keyspace_and_view_name = (keyspace_name, view_name);

@@ -1447,24 +1461,18 @@ async fn query_tables_schema(
// This column shouldn't be exposed to the user but is currently exposed in system tables.
const THRIFT_EMPTY_TYPE: &str = "empty";

let rows = query_filter_keyspace_name(conn,
"select keyspace_name, table_name, column_name, kind, position, type from system_schema.columns", keyspaces_to_fetch
type RowType = (String, String, String, String, i32, String);

let rows = query_filter_keyspace_name::<RowType>(conn,
"select keyspace_name, table_name, column_name, kind, position, type from system_schema.columns", keyspaces_to_fetch, |err| {
MetadataError::Tables(TablesMetadataError::SchemaColumnsInvalidColumnType(err))
}
);

let mut tables_schema = HashMap::new();

rows.map(|row_result| {
let row = row_result?;
let (keyspace_name, table_name, column_name, kind, position, type_): (
String,
String,
String,
String,
i32,
String,
) = row.into_typed().map_err(|err| {
MetadataError::Tables(TablesMetadataError::SchemaColumnsInvalidColumnType(err))
})?;
let (keyspace_name, table_name, column_name, kind, position, type_) = row_result?;

if type_ == THRIFT_EMPTY_TYPE {
return Ok::<_, QueryError>(());
@@ -1674,15 +1682,21 @@ async fn query_table_partitioners(
let rows = conn
.clone()
.query_iter(partitioner_query)
.map(|pager_res| {
let pager = pager_res?;
let stream = pager
.rows_stream::<(String, String, Option<String>)>()
.map_err(|err| {
MetadataError::Tables(TablesMetadataError::SchemaTablesInvalidColumnType(err))
})?;
Ok::<_, QueryError>(stream)
})
.into_stream()
.try_flatten();

let result = rows
.map(|row_result| {
let (keyspace_name, table_name, partitioner) =
row_result?.into_typed().map_err(|err| {
MetadataError::Tables(TablesMetadataError::SchemaTablesInvalidColumnType(err))
})?;
let (keyspace_name, table_name, partitioner) = row_result?;
Ok::<_, QueryError>(((keyspace_name, table_name), partitioner))
})
.try_collect::<HashMap<_, _>>()