Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce new deserialization API #1057

Merged
merged 25 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
51c1005
iterator: fix QueryPager docstring
wprzytula Nov 7, 2024
e67e5da
query_result: fix QueryRowsResult's docstrings
wprzytula Nov 10, 2024
463b9b5
treewide: rename Session to LegacySession
piodul Mar 15, 2023
ed57255
session: make generic and introduce "session kind" parameter
piodul Mar 16, 2023
8e36957
session: move query-related methods to a separate block
piodul Mar 16, 2023
6daf833
session: re-introduce the Session type as an alias
piodul Mar 16, 2023
c1416dd
session_builder: rename build->build_legacy and then reintroduce
piodul Mar 16, 2023
e9d4719
tests: scylla_supports_tablets[_legacy] suffix
wprzytula Aug 8, 2024
37ff7c6
session: partly de-genericise internal query/exec functions
piodul Mar 13, 2023
c229ae5
session: return new QueryResult from internal methods
piodul Mar 13, 2023
22f28cd
session: add interface methods for the new deser API
piodul Mar 13, 2023
2ec2885
connection: switch to the new deserialization framework
piodul Mar 14, 2023
b3f4a04
caching_session: make generic over session APIs
piodul Mar 14, 2023
001b5bb
caching_session: fix docstring references
wprzytula Nov 6, 2024
db6bee0
caching_session: modernize tests
piodul Mar 14, 2023
6d9d971
connection: migrate query_iter to new deserialization framework
wprzytula Mar 12, 2024
f3aae01
topology: reduce `query_filter_keyspace_name` monomorphisation penalty
wprzytula Nov 7, 2024
2b5f386
{session,tracing}: switch to the new deser framework for tracing info
piodul Mar 17, 2023
5919cf9
treewide: switch tests to use the new framework
wprzytula Mar 12, 2024
9a092f9
examples: adjust to use the new interface
piodul Mar 14, 2023
a204a7b
codewide: migrate doctests to new deser API
wprzytula Aug 14, 2024
e99b875
session_test: regression test empty collections deserialization
wprzytula May 21, 2024
f1e7e02
codewide: introduce DeserializeOwned{Row,Value}
wprzytula Nov 10, 2024
98b382d
iterator: rename RowIteratorWorker to PagerWorker
wprzytula Nov 12, 2024
d4a222c
iterator: fix QueryPager::rows_stream() lifetime constraints
wprzytula Nov 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/allocations.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Result;
use scylla::{statement::prepared_statement::PreparedStatement, Session, SessionBuilder};
use scylla::transport::session::Session;
use scylla::{statement::prepared_statement::PreparedStatement, SessionBuilder};
use std::io::Write;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down
29 changes: 15 additions & 14 deletions examples/basic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use anyhow::Result;
use futures::TryStreamExt;
use scylla::macros::FromRow;
use futures::StreamExt as _;
use futures::TryStreamExt as _;
use scylla::frame::response::result::Row;
use scylla::transport::session::Session;
use scylla::DeserializeRow;
use scylla::SessionBuilder;
use std::env;

Expand Down Expand Up @@ -53,39 +55,38 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT a, b, c FROM examples_ks.basic", &[])
.await?
.into_typed::<(i32, i32, String)>();
.rows_stream::<(i32, i32, String)>()?;
while let Some((a, b, c)) = iter.try_next().await? {
println!("a, b, c: {}, {}, {}", a, b, c);
}
Lorak-mmk marked this conversation as resolved.
Show resolved Hide resolved

// Or as custom structs that derive FromRow
#[derive(Debug, FromRow)]
// Or as custom structs that derive DeserializeRow
#[allow(unused)]
#[derive(Debug, DeserializeRow)]
struct RowData {
_a: i32,
_b: Option<i32>,
_c: String,
a: i32,
b: Option<i32>,
c: String,
}

let mut iter = session
.query_iter("SELECT a, b, c FROM examples_ks.basic", &[])
.await?
.into_typed::<RowData>();
.rows_stream::<RowData>()?;
while let Some(row_data) = iter.try_next().await? {
println!("row_data: {:?}", row_data);
}

// Or simply as untyped rows
let mut iter = session
.query_iter("SELECT a, b, c FROM examples_ks.basic", &[])
.await?;
while let Some(row) = iter.try_next().await? {
.await?
.rows_stream::<Row>()?;
while let Some(row) = iter.next().await.transpose()? {
let a = row.columns[0].as_ref().unwrap().as_int().unwrap();
let b = row.columns[1].as_ref().unwrap().as_int().unwrap();
let c = row.columns[2].as_ref().unwrap().as_text().unwrap();
println!("a, b, c: {}, {}, {}", a, b, c);

// Alternatively each row can be parsed individually
// let (a2, b2, c2) = row.into_typed::<(i32, i32, String)>() ?;
}

let metrics = session.get_metrics();
Expand Down
4 changes: 3 additions & 1 deletion examples/compare-tokens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ async fn main() -> Result<()> {
(pk,),
)
.await?
.single_row_typed::<(i64,)>()?;
.into_rows_result()?
.expect("Got not Rows result")
.single_row()?;
assert_eq!(t, qt);
println!("token for {}: {}", pk, t);
}
Expand Down
20 changes: 10 additions & 10 deletions examples/cql-time-types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use anyhow::Result;
use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
use futures::{StreamExt, TryStreamExt};
use futures::{StreamExt as _, TryStreamExt as _};
use scylla::frame::response::result::CqlValue;
use scylla::frame::value::{CqlDate, CqlTime, CqlTimestamp};
use scylla::transport::session::Session;
Expand Down Expand Up @@ -44,7 +44,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.dates", &[])
.await?
.into_typed::<(NaiveDate,)>();
.rows_stream::<(NaiveDate,)>()?;
while let Some(row_result) = iter.next().await {
let (read_date,): (NaiveDate,) = match row_result {
Ok(read_date) => read_date,
Expand All @@ -66,7 +66,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.dates", &[])
.await?
.into_typed::<(time::Date,)>();
.rows_stream::<(time::Date,)>()?;
while let Some(row_result) = iter.next().await {
let (read_date,): (time::Date,) = match row_result {
Ok(read_date) => read_date,
Expand All @@ -88,7 +88,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.dates", &[])
.await?
.into_typed::<(CqlValue,)>();
.rows_stream::<(CqlValue,)>()?;
while let Some(row_result) = iter.next().await {
let read_days: u32 = match row_result {
Ok((CqlValue::Date(CqlDate(days)),)) => days,
Expand Down Expand Up @@ -124,7 +124,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.times", &[])
.await?
.into_typed::<(NaiveTime,)>();
.rows_stream::<(NaiveTime,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!("Parsed a time into chrono::NaiveTime: {:?}", read_time);
}
Expand All @@ -139,7 +139,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.times", &[])
.await?
.into_typed::<(time::Time,)>();
.rows_stream::<(time::Time,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!("Parsed a time into time::Time: {:?}", read_time);
}
Expand All @@ -154,7 +154,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.times", &[])
.await?
.into_typed::<(CqlTime,)>();
.rows_stream::<(CqlTime,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!("Read a time as raw nanos: {:?}", read_time);
}
Expand Down Expand Up @@ -185,7 +185,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.timestamps", &[])
.await?
.into_typed::<(DateTime<Utc>,)>();
.rows_stream::<(DateTime<Utc>,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!(
"Parsed a timestamp into chrono::DateTime<chrono::Utc>: {:?}",
Expand All @@ -206,7 +206,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.timestamps", &[])
.await?
.into_typed::<(time::OffsetDateTime,)>();
.rows_stream::<(time::OffsetDateTime,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!(
"Parsed a timestamp into time::OffsetDateTime: {:?}",
Expand All @@ -227,7 +227,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.timestamps", &[])
.await?
.into_typed::<(CqlTimestamp,)>();
.rows_stream::<(CqlTimestamp,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!("Read a timestamp as raw millis: {:?}", read_time);
}
Expand Down
43 changes: 25 additions & 18 deletions examples/cqlsh-rs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ use rustyline::completion::{Completer, Pair};
use rustyline::error::ReadlineError;
use rustyline::{CompletionType, Config, Context, Editor};
use rustyline_derive::{Helper, Highlighter, Hinter, Validator};
use scylla::frame::response::result::Row;
use scylla::transport::session::Session;
use scylla::transport::Compression;
use scylla::{LegacyQueryResult, Session, SessionBuilder};
use scylla::QueryRowsResult;
use scylla::SessionBuilder;
use std::env;

#[derive(Helper, Highlighter, Validator, Hinter)]
Expand Down Expand Up @@ -173,23 +176,24 @@ impl Completer for CqlHelper {
}
}

fn print_result(result: &LegacyQueryResult) {
if result.rows.is_none() {
println!("OK");
return;
}
for row in result.rows.as_ref().unwrap() {
for column in &row.columns {
print!("|");
print!(
" {:16}",
match column {
None => "null".to_owned(),
Some(value) => format!("{:?}", value),
}
);
fn print_result(result: Option<&QueryRowsResult>) {
if let Some(rows_result) = result {
for row in rows_result.rows::<Row>().unwrap() {
let row = row.unwrap();
for column in &row.columns {
print!("|");
print!(
" {:16}",
match column {
None => "null".to_owned(),
Some(value) => format!("{:?}", value),
}
);
}
println!("|")
}
println!("|")
} else {
println!("OK");
}
}

Expand Down Expand Up @@ -222,7 +226,10 @@ async fn main() -> Result<()> {
let maybe_res = session.query_unpaged(line, &[]).await;
match maybe_res {
Err(err) => println!("Error: {}", err),
Ok(res) => print_result(&res),
Ok(res) => {
let rows_res = res.into_rows_result()?;
print_result(rows_res.as_ref())
}
}
}
Err(ReadlineError::Interrupted) => continue,
Expand Down
63 changes: 24 additions & 39 deletions examples/custom_deserialization.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use anyhow::Result;
use scylla::cql_to_rust::{FromCqlVal, FromCqlValError};
use scylla::frame::response::result::CqlValue;
use scylla::macros::impl_from_cql_value_from_method;
use scylla::{Session, SessionBuilder};
use anyhow::{Context, Result};
use scylla::deserialize::DeserializeValue;
use scylla::frame::response::result::ColumnType;
use scylla::transport::session::Session;
use scylla::SessionBuilder;
use std::env;

#[tokio::main]
Expand All @@ -28,53 +28,38 @@ async fn main() -> Result<()> {
)
.await?;

// You can implement FromCqlVal for your own types
// You can implement DeserializeValue for your own types
#[derive(PartialEq, Eq, Debug)]
struct MyType(String);
struct MyType<'a>(&'a str);

impl FromCqlVal<CqlValue> for MyType {
fn from_cql(cql_val: CqlValue) -> Result<Self, FromCqlValError> {
Ok(Self(
cql_val.into_string().ok_or(FromCqlValError::BadCqlType)?,
))
impl<'frame, 'metadata> DeserializeValue<'frame, 'metadata> for MyType<'frame> {
fn type_check(
typ: &scylla::frame::response::result::ColumnType,
) -> std::result::Result<(), scylla::deserialize::TypeCheckError> {
<&str as DeserializeValue<'frame, 'metadata>>::type_check(typ)
}
}

let (v,) = session
.query_unpaged(
"SELECT v FROM examples_ks.custom_deserialization WHERE pk = 1",
(),
)
.await?
.single_row_typed::<(MyType,)>()?;
assert_eq!(v, MyType("asdf".to_owned()));

// If you defined an extension trait for CqlValue then you can use
// the `impl_from_cql_value_from_method` macro to turn it into
// a FromCqlValue impl
#[derive(PartialEq, Eq, Debug)]
struct MyOtherType(String);

trait CqlValueExt {
fn into_my_other_type(self) -> Option<MyOtherType>;
}
fn deserialize(
typ: &'metadata ColumnType<'metadata>,
v: Option<scylla::deserialize::FrameSlice<'frame>>,
) -> std::result::Result<Self, scylla::deserialize::DeserializationError> {
let s = <&str as DeserializeValue<'frame, 'metadata>>::deserialize(typ, v)?;

impl CqlValueExt for CqlValue {
fn into_my_other_type(self) -> Option<MyOtherType> {
Some(MyOtherType(self.into_string()?))
Ok(Self(s))
}
}

impl_from_cql_value_from_method!(MyOtherType, into_my_other_type);

let (v,) = session
let rows_result = session
.query_unpaged(
"SELECT v FROM examples_ks.custom_deserialization WHERE pk = 1",
(),
)
.await?
.single_row_typed::<(MyOtherType,)>()?;
assert_eq!(v, MyOtherType("asdf".to_owned()));
.into_rows_result()?
.context("Expected Result:Rows response, got a different Result response.")?;

let (v,) = rows_result.single_row::<(MyType,)>()?;
assert_eq!(v, MyType("asdf"));

println!("Ok.");

Expand Down
25 changes: 17 additions & 8 deletions examples/get_by_name.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{anyhow, Result};
use anyhow::{anyhow, Context as _, Result};
use scylla::frame::response::result::Row;
use scylla::transport::session::Session;
use scylla::SessionBuilder;
use std::env;
Expand Down Expand Up @@ -35,18 +36,26 @@ async fn main() -> Result<()> {
)
.await?;

let query_result = session
let rows_result = session
.query_unpaged("SELECT pk, ck, value FROM examples_ks.get_by_name", &[])
.await?;
let (ck_idx, _) = query_result
.get_column_spec("ck")
.await?
.into_rows_result()?
.context("Response is not of Rows type")?;
let col_specs = rows_result.column_specs();
let (ck_idx, _) = col_specs
.get_by_name("ck")
.ok_or_else(|| anyhow!("No ck column found"))?;
let (value_idx, _) = query_result
.get_column_spec("value")
let (value_idx, _) = col_specs
.get_by_name("value")
.ok_or_else(|| anyhow!("No value column found"))?;
let rows = rows_result
.rows::<Row>()
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
println!("ck | value");
println!("---------------------");
for row in query_result.rows.ok_or_else(|| anyhow!("no rows found"))? {
for row in rows {
println!("{:?} | {:?}", row.columns[ck_idx], row.columns[value_idx]);
}

Expand Down
3 changes: 1 addition & 2 deletions examples/logging_log.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use anyhow::Result;
use scylla::transport::session::Session;
use scylla::SessionBuilder;
use scylla::{Session, SessionBuilder};
use std::env;
use tracing::info;

Expand Down
8 changes: 6 additions & 2 deletions examples/query_history.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
//! This example shows how to collect history of query execution.

use anyhow::Result;
use futures::StreamExt;
use futures::StreamExt as _;
use scylla::frame::response::result::Row;
use scylla::history::{HistoryCollector, StructuredHistory};
use scylla::query::Query;
use scylla::transport::session::Session;
Expand Down Expand Up @@ -59,7 +60,10 @@ async fn main() -> Result<()> {
let iter_history_listener = Arc::new(HistoryCollector::new());
iter_query.set_history_listener(iter_history_listener.clone());

let mut rows_iterator = session.query_iter(iter_query, ()).await?;
let mut rows_iterator = session
.query_iter(iter_query, ())
.await?
.rows_stream::<Row>()?;
while let Some(_row) = rows_iterator.next().await {
// Receive rows...
}
Expand Down
Loading
Loading