diff --git a/examples/basic.rs b/examples/basic.rs index c8e5510d80..72b6a5ce11 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use futures::TryStreamExt; use scylla::macros::FromRow; use scylla::transport::session::Session; use scylla::SessionBuilder; @@ -49,11 +50,11 @@ async fn main() -> Result<()> { .await?; // Rows can be parsed as tuples - let result = session - .query_unpaged("SELECT a, b, c FROM examples_ks.basic", &[]) - .await?; - let mut iter = result.rows_typed::<(i32, i32, String)>()?; - while let Some((a, b, c)) = iter.next().transpose()? { + let mut iter = session + .query_iter("SELECT a, b, c FROM examples_ks.basic", &[]) + .await? + .into_typed::<(i32, i32, String)>(); + while let Some((a, b, c)) = iter.try_next().await? { println!("a, b, c: {}, {}, {}", a, b, c); } @@ -65,20 +66,19 @@ async fn main() -> Result<()> { _c: String, } - let result = session - .query_unpaged("SELECT a, b, c FROM examples_ks.basic", &[]) - .await?; - let mut iter = result.rows_typed::()?; - while let Some(row_data) = iter.next().transpose()? { + let mut iter = session + .query_iter("SELECT a, b, c FROM examples_ks.basic", &[]) + .await? + .into_typed::(); + while let Some(row_data) = iter.try_next().await? { println!("row_data: {:?}", row_data); } // Or simply as untyped rows - let result = session - .query_unpaged("SELECT a, b, c FROM examples_ks.basic", &[]) + let mut iter = session + .query_iter("SELECT a, b, c FROM examples_ks.basic", &[]) .await?; - let rows = result.rows.unwrap(); - for row in rows { + while let Some(row) = iter.try_next().await? { let a = row.columns[0].as_ref().unwrap().as_int().unwrap(); let b = row.columns[1].as_ref().unwrap().as_int().unwrap(); let c = row.columns[2].as_ref().unwrap().as_text().unwrap(); diff --git a/examples/cql-time-types.rs b/examples/cql-time-types.rs index e7e24725d8..548ac69878 100644 --- a/examples/cql-time-types.rs +++ b/examples/cql-time-types.rs @@ -3,6 +3,7 @@ use anyhow::Result; use chrono::{DateTime, NaiveDate, NaiveTime, Utc}; +use futures::{StreamExt, TryStreamExt}; use scylla::frame::response::result::CqlValue; use scylla::frame::value::{CqlDate, CqlTime, CqlTimestamp}; use scylla::transport::session::Session; @@ -40,11 +41,12 @@ async fn main() -> Result<()> { ) .await?; - let result = session - .query_unpaged("SELECT d from examples_ks.dates", &[]) - .await?; - for row in result.rows_typed::<(NaiveDate,)>()? { - let (read_date,): (NaiveDate,) = match row { + let mut iter = session + .query_iter("SELECT d from examples_ks.dates", &[]) + .await? + .into_typed::<(NaiveDate,)>(); + while let Some(row_result) = iter.next().await { + let (read_date,): (NaiveDate,) = match row_result { Ok(read_date) => read_date, Err(_) => continue, // We might read a date that does not fit in NaiveDate, skip it }; @@ -61,11 +63,12 @@ async fn main() -> Result<()> { .query_unpaged("INSERT INTO examples_ks.dates (d) VALUES (?)", (time_date,)) .await?; - let result = session - .query_unpaged("SELECT d from examples_ks.dates", &[]) - .await?; - for row in result.rows_typed::<(time::Date,)>()? { - let (read_date,) = match row { + let mut iter = session + .query_iter("SELECT d from examples_ks.dates", &[]) + .await? + .into_typed::<(time::Date,)>(); + while let Some(row_result) = iter.next().await { + let (read_date,): (time::Date,) = match row_result { Ok(read_date) => read_date, Err(_) => continue, // We might read a date that does not fit in time::Date, skip it }; @@ -82,13 +85,13 @@ async fn main() -> Result<()> { ) .await?; - let result = session - .query_unpaged("SELECT d from examples_ks.dates", &[]) - .await?; - let mut iter = result.rows_typed::<(CqlValue,)>()?; - while let Some((value,)) = iter.next().transpose()? { - let read_days: u32 = match value { - CqlValue::Date(CqlDate(days)) => days, + let mut iter = session + .query_iter("SELECT d from examples_ks.dates", &[]) + .await? + .into_typed::<(CqlValue,)>(); + while let Some(row_result) = iter.next().await { + let read_days: u32 = match row_result { + Ok((CqlValue::Date(CqlDate(days)),)) => days, _ => panic!("oh no"), }; @@ -118,11 +121,11 @@ async fn main() -> Result<()> { ) .await?; - let result = session - .query_unpaged("SELECT t from examples_ks.times", &[]) - .await?; - let mut iter = result.rows_typed::<(NaiveTime,)>()?; - while let Some((read_time,)) = iter.next().transpose()? { + let mut iter = session + .query_iter("SELECT d from examples_ks.times", &[]) + .await? + .into_typed::<(NaiveTime,)>(); + while let Some((read_time,)) = iter.try_next().await? { println!("Parsed a time into chrono::NaiveTime: {:?}", read_time); } @@ -133,11 +136,11 @@ async fn main() -> Result<()> { .query_unpaged("INSERT INTO examples_ks.times (t) VALUES (?)", (time_time,)) .await?; - let result = session - .query_unpaged("SELECT t from examples_ks.times", &[]) - .await?; - let mut iter = result.rows_typed::<(time::Time,)>()?; - while let Some((read_time,)) = iter.next().transpose()? { + let mut iter = session + .query_iter("SELECT d from examples_ks.times", &[]) + .await? + .into_typed::<(time::Time,)>(); + while let Some((read_time,)) = iter.try_next().await? { println!("Parsed a time into time::Time: {:?}", read_time); } @@ -148,11 +151,11 @@ async fn main() -> Result<()> { .query_unpaged("INSERT INTO examples_ks.times (t) VALUES (?)", (time_time,)) .await?; - let result = session - .query_unpaged("SELECT t from examples_ks.times", &[]) - .await?; - let mut iter = result.rows_typed::<(CqlTime,)>()?; - while let Some((read_time,)) = iter.next().transpose()? { + let mut iter = session + .query_iter("SELECT d from examples_ks.times", &[]) + .await? + .into_typed::<(CqlTime,)>(); + while let Some((read_time,)) = iter.try_next().await? { println!("Read a time as raw nanos: {:?}", read_time); } @@ -179,11 +182,11 @@ async fn main() -> Result<()> { ) .await?; - let result = session - .query_unpaged("SELECT t from examples_ks.timestamps", &[]) - .await?; - let mut iter = result.rows_typed::<(DateTime,)>()?; - while let Some((read_time,)) = iter.next().transpose()? { + let mut iter = session + .query_iter("SELECT d from examples_ks.timestamps", &[]) + .await? + .into_typed::<(DateTime,)>(); + while let Some((read_time,)) = iter.try_next().await? { println!( "Parsed a timestamp into chrono::DateTime: {:?}", read_time @@ -200,11 +203,11 @@ async fn main() -> Result<()> { ) .await?; - let result = session - .query_unpaged("SELECT t from examples_ks.timestamps", &[]) - .await?; - let mut iter = result.rows_typed::<(time::OffsetDateTime,)>()?; - while let Some((read_time,)) = iter.next().transpose()? { + let mut iter = session + .query_iter("SELECT d from examples_ks.timestamps", &[]) + .await? + .into_typed::<(time::OffsetDateTime,)>(); + while let Some((read_time,)) = iter.try_next().await? { println!( "Parsed a timestamp into time::OffsetDateTime: {:?}", read_time @@ -221,11 +224,11 @@ async fn main() -> Result<()> { ) .await?; - let result = session - .query_unpaged("SELECT t from examples_ks.timestamps", &[]) - .await?; - let mut iter = result.rows_typed::<(CqlTimestamp,)>()?; - while let Some((read_time,)) = iter.next().transpose()? { + let mut iter = session + .query_iter("SELECT d from examples_ks.timestamps", &[]) + .await? + .into_typed::<(CqlTimestamp,)>(); + while let Some((read_time,)) = iter.try_next().await? { println!("Read a timestamp as raw millis: {:?}", read_time); } diff --git a/examples/schema_agreement.rs b/examples/schema_agreement.rs index c337f083ec..4709873fc4 100644 --- a/examples/schema_agreement.rs +++ b/examples/schema_agreement.rs @@ -1,4 +1,5 @@ use anyhow::{bail, Result}; +use futures::TryStreamExt; use scylla::transport::errors::QueryError; use scylla::transport::session::Session; use scylla::SessionBuilder; @@ -66,11 +67,11 @@ async fn main() -> Result<()> { .await?; // Rows can be parsed as tuples - let result = session - .query_unpaged("SELECT a, b, c FROM examples_ks.schema_agreement", &[]) - .await?; - let mut iter = result.rows_typed::<(i32, i32, String)>()?; - while let Some((a, b, c)) = iter.next().transpose()? { + let mut iter = session + .query_iter("SELECT a, b, c FROM examples_ks.schema_agreement", &[]) + .await? + .into_typed::<(i32, i32, String)>(); + while let Some((a, b, c)) = iter.try_next().await? { println!("a, b, c: {}, {}, {}", a, b, c); } diff --git a/examples/tls.rs b/examples/tls.rs index b011e1ff30..0671352147 100644 --- a/examples/tls.rs +++ b/examples/tls.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use futures::TryStreamExt; use scylla::transport::session::Session; use scylla::SessionBuilder; use std::env; @@ -86,11 +87,11 @@ async fn main() -> Result<()> { .await?; // Rows can be parsed as tuples - let result = session - .query_unpaged("SELECT a, b, c FROM examples_ks.tls", &[]) - .await?; - let mut iter = result.rows_typed::<(i32, i32, String)>()?; - while let Some((a, b, c)) = iter.next().transpose()? { + let mut iter = session + .query_iter("SELECT a, b, c FROM examples_ks.tls", &[]) + .await? + .into_typed::<(i32, i32, String)>(); + while let Some((a, b, c)) = iter.try_next().await? { println!("a, b, c: {}, {}, {}", a, b, c); } diff --git a/examples/user-defined-type.rs b/examples/user-defined-type.rs index dd16346996..6e2d65286c 100644 --- a/examples/user-defined-type.rs +++ b/examples/user-defined-type.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use futures::TryStreamExt; use scylla::macros::FromUserType; use scylla::{SerializeValue, Session, SessionBuilder}; use std::env; @@ -49,11 +50,14 @@ async fn main() -> Result<()> { .await?; // And read like any normal value - let result = session - .query_unpaged("SELECT my FROM examples_ks.user_defined_type_table", &[]) - .await?; - let mut iter = result.rows_typed::<(MyType,)>()?; - while let Some((my_val,)) = iter.next().transpose()? { + let mut iter = session + .query_iter( + "SELECT a, b, c FROM examples_ks.user_defined_type_table", + &[], + ) + .await? + .into_typed::<(MyType,)>(); + while let Some((my_val,)) = iter.try_next().await? { println!("{:?}", my_val); } diff --git a/examples/value_list.rs b/examples/value_list.rs index 54a0b0bed3..81568baeef 100644 --- a/examples/value_list.rs +++ b/examples/value_list.rs @@ -1,23 +1,23 @@ +use anyhow::Result; use scylla::{Session, SessionBuilder}; use std::env; #[tokio::main] -async fn main() { +async fn main() -> Result<()> { let uri = env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string()); println!("Connecting to {} ...", uri); - let session: Session = SessionBuilder::new().known_node(uri).build().await.unwrap(); + let session: Session = SessionBuilder::new().known_node(uri).build().await?; - session.query_unpaged("CREATE KEYSPACE IF NOT EXISTS examples_ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}", &[]).await.unwrap(); + session.query_unpaged("CREATE KEYSPACE IF NOT EXISTS examples_ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}", &[]).await?; session .query_unpaged( "CREATE TABLE IF NOT EXISTS examples_ks.my_type (k int, my text, primary key (k))", &[], ) - .await - .unwrap(); + .await?; #[derive(scylla::SerializeRow)] struct MyType<'a> { @@ -35,8 +35,7 @@ async fn main() { "INSERT INTO examples_ks.my_type (k, my) VALUES (?, ?)", to_insert, ) - .await - .unwrap(); + .await?; // You can also use type generics: #[derive(scylla::SerializeRow)] @@ -55,13 +54,13 @@ async fn main() { "INSERT INTO examples_ks.my_type (k, my) VALUES (?, ?)", to_insert_2, ) - .await - .unwrap(); + .await?; let q = session .query_unpaged("SELECT * FROM examples_ks.my_type", &[]) - .await - .unwrap(); + .await?; println!("Q: {:?}", q.rows); + + Ok(()) }