From 9ee0ee1ac9801760ab249b125a74380073bc7b0c Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Wed, 1 Feb 2023 12:01:52 +0100 Subject: [PATCH] treewide: get rid of (most) uses of QueryResult::rows MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit serves two purposes: - Modernizes the existing examples and internal uses of QueryResult in the driver. We have had helpers such as rows_typed etc. for a long time and they are supposed to offer superior experience, although many of our tests still use QueryResult::rows directly. - Prepares for the iterator-based deserialization refactor. The representation of QueryResult will change and the rows field will not be available anymore - instead, the helper methods very similar to the current ones will be mandatory. Co-authored-by: Wojciech Przytuła --- README.md | 9 +- docs/source/data-types/blob.md | 8 +- docs/source/data-types/collections.md | 48 +++--- docs/source/data-types/counter.md | 10 +- docs/source/data-types/date.md | 24 +-- docs/source/data-types/decimal.md | 8 +- docs/source/data-types/duration.md | 12 +- docs/source/data-types/inet.md | 8 +- docs/source/data-types/primitive.md | 67 ++++---- docs/source/data-types/text.md | 8 +- docs/source/data-types/time.md | 24 +-- docs/source/data-types/timestamp.md | 24 +-- docs/source/data-types/tuple.md | 16 +- docs/source/data-types/udt.md | 8 +- docs/source/data-types/uuid.md | 8 +- docs/source/data-types/varint.md | 8 +- docs/source/queries/simple.md | 10 +- docs/source/quickstart/example.md | 10 +- examples/basic.rs | 45 ++---- examples/compare-tokens.rs | 19 +-- examples/cql-time-types.rs | 162 ++++++++----------- examples/schema_agreement.rs | 16 +- examples/select-paging.rs | 12 +- examples/tls.rs | 16 +- examples/user-defined-type.rs | 15 +- scylla/src/transport/caching_session.rs | 6 +- scylla/src/transport/connection.rs | 20 ++- scylla/src/transport/cql_collections_test.rs | 8 +- scylla/src/transport/cql_types_test.rs | 103 +++--------- scylla/src/transport/session.rs | 35 ++-- scylla/src/transport/session_test.rs | 129 +++++---------- 31 files changed, 345 insertions(+), 551 deletions(-) diff --git a/README.md b/README.md index a6d2f27bf1..02364982b4 100644 --- a/README.md +++ b/README.md @@ -19,11 +19,10 @@ let uri = "127.0.0.1:9042"; let session: Session = SessionBuilder::new().known_node(uri).build().await?; -if let Some(rows) = session.query("SELECT a, b, c FROM ks.t", &[]).await?.rows { - for row in rows.into_typed::<(i32, i32, String)>() { - let (a, b, c) = row?; - println!("a, b, c: {}, {}, {}", a, b, c); - } +let result = session.query("SELECT a, b, c FROM ks.t", &[]).await?; +let mut iter = result.rows_typed::<(i32, i32, String)>()?; +while let Some((a, b, c)) = iter.next().transpose()? { + println!("a, b, c: {}, {}, {}", a, b, c); } ``` diff --git a/docs/source/data-types/blob.md b/docs/source/data-types/blob.md index c213da882c..83ef1306e8 100644 --- a/docs/source/data-types/blob.md +++ b/docs/source/data-types/blob.md @@ -17,10 +17,10 @@ session .await?; // Read blobs from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(Vec,)>() { - let (blob_value,): (Vec,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(Vec,)>()?; +while let Some((blob_value,)) = iter.next().transpose()? { + println!("{:?}", blob_value); } # Ok(()) # } diff --git a/docs/source/data-types/collections.md b/docs/source/data-types/collections.md index 43301d31d2..5a1570ad3d 100644 --- a/docs/source/data-types/collections.md +++ b/docs/source/data-types/collections.md @@ -17,10 +17,10 @@ session .await?; // Read a list of ints from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(Vec,)>() { - let (list_value,): (Vec,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(Vec,)>()?; +while let Some((list_value,)) = iter.next().transpose()? { + println!("{:?}", list_value); } # Ok(()) # } @@ -43,10 +43,10 @@ session .await?; // Read a set of ints from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(Vec,)>() { - let (set_value,): (Vec,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(Vec,)>()?; +while let Some((list_value,)) = iter.next().transpose()? { + println!("{:?}", list_value); } # Ok(()) # } @@ -67,10 +67,10 @@ session .await?; // Read a set of ints from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(HashSet,)>() { - let (set_value,): (HashSet,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(HashSet,)>()?; +while let Some((list_value,)) = iter.next().transpose()? { + println!("{:?}", list_value); } # Ok(()) # } @@ -91,10 +91,10 @@ session .await?; // Read a set of ints from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(BTreeSet,)>() { - let (set_value,): (BTreeSet,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(BTreeSet,)>()?; +while let Some((list_value,)) = iter.next().transpose()? { + println!("{:?}", list_value); } # Ok(()) # } @@ -120,10 +120,10 @@ session .await?; // Read a map from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(HashMap,)>() { - let (map_value,): (HashMap,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(HashMap,)>()?; +while let Some((map_value,)) = iter.next().transpose()? { + println!("{:?}", map_value); } # Ok(()) # } @@ -146,10 +146,10 @@ session .await?; // Read a map from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(BTreeMap,)>() { - let (map_value,): (BTreeMap,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(BTreeMap,)>()?; +while let Some((map_value,)) = iter.next().transpose()? { + println!("{:?}", map_value); } # Ok(()) # } diff --git a/docs/source/data-types/counter.md b/docs/source/data-types/counter.md index 0f31b6cba7..37eb46439f 100644 --- a/docs/source/data-types/counter.md +++ b/docs/source/data-types/counter.md @@ -11,11 +11,11 @@ use scylla::IntoTypedRows; use scylla::frame::value::Counter; // Read counter from the table -if let Some(rows) = session.query("SELECT c FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(Counter,)>() { - let (counter_value,): (Counter,) = row?; - let counter_int_value: i64 = counter_value.0; - } +let result = session.query("SELECT c FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(Counter,)>()?; +while let Some((counter_value,)) = iter.next().transpose()? { + let counter_int_value: i64 = counter_value.0; + println!("{}", counter_int_value); } # Ok(()) # } diff --git a/docs/source/data-types/date.md b/docs/source/data-types/date.md index 6e59457f2e..202665295a 100644 --- a/docs/source/data-types/date.md +++ b/docs/source/data-types/date.md @@ -67,14 +67,10 @@ session .await?; // Read NaiveDate from the table -if let Some(rows) = session - .query("SELECT a FROM keyspace.table", &[]) - .await? - .rows -{ - for row in rows.into_typed::<(NaiveDate,)>() { - let (date_value,): (NaiveDate,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(NaiveDate,)>()?; +while let Some((date_value,)) = iter.next().transpose()? { + println!("{:?}", date_value); } # Ok(()) # } @@ -105,14 +101,10 @@ session .await?; // Read Date from the table -if let Some(rows) = session - .query("SELECT a FROM keyspace.table", &[]) - .await? - .rows -{ - for row in rows.into_typed::<(Date,)>() { - let (date_value,): (Date,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(Date,)>()?; +while let Some((date_value,)) = iter.next().transpose()? { + println!("{:?}", date_value); } # Ok(()) # } diff --git a/docs/source/data-types/decimal.md b/docs/source/data-types/decimal.md index 74926c8383..6eb3776f69 100644 --- a/docs/source/data-types/decimal.md +++ b/docs/source/data-types/decimal.md @@ -52,10 +52,10 @@ session .await?; // Read a decimal from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(BigDecimal,)>() { - let (decimal_value,): (BigDecimal,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(BigDecimal,)>()?; +while let Some((decimal_value,)) = iter.next().transpose()? { + println!("{:?}", decimal_value); } # Ok(()) # } diff --git a/docs/source/data-types/duration.md b/docs/source/data-types/duration.md index 7526a478b3..79f9f47080 100644 --- a/docs/source/data-types/duration.md +++ b/docs/source/data-types/duration.md @@ -9,17 +9,17 @@ use scylla::IntoTypedRows; use scylla::frame::value::CqlDuration; -// Insert some ip address into the table +// Insert some duration into the table let to_insert: CqlDuration = CqlDuration { months: 1, days: 2, nanoseconds: 3 }; session .query("INSERT INTO keyspace.table (a) VALUES(?)", (to_insert,)) .await?; -// Read inet from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(CqlDuration,)>() { - let (cql_duration,): (CqlDuration,) = row?; - } +// Read duration from the table +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(CqlDuration,)>()?; +while let Some((duration_value,)) = iter.next().transpose()? { + println!("{:?}", duration_value); } # Ok(()) # } diff --git a/docs/source/data-types/inet.md b/docs/source/data-types/inet.md index c585aefc05..c7c9f26ee9 100644 --- a/docs/source/data-types/inet.md +++ b/docs/source/data-types/inet.md @@ -16,10 +16,10 @@ session .await?; // Read inet from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(IpAddr,)>() { - let (inet_value,): (IpAddr,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(IpAddr,)>()?; +while let Some((inet_value,)) = iter.next().transpose()? { + println!("{:?}", inet_value); } # Ok(()) # } diff --git a/docs/source/data-types/primitive.md b/docs/source/data-types/primitive.md index e521e5e6c7..2bc69ce6f0 100644 --- a/docs/source/data-types/primitive.md +++ b/docs/source/data-types/primitive.md @@ -1,6 +1,7 @@ # Bool, Tinyint, Smallint, Int, Bigint, Float, Double ### Bool + `Bool` is represented as rust `bool` ```rust @@ -17,16 +18,17 @@ session .await?; // Read a bool from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(bool,)>() { - let (bool_value,): (bool,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(bool,)>()?; +while let Some((bool_value,)) = iter.next().transpose()? { + println!("{}", bool_value); } # Ok(()) # } ``` ### Tinyint + `Tinyint` is represented as rust `i8` ```rust @@ -43,16 +45,17 @@ session .await?; // Read a tinyint from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(i8,)>() { - let (tinyint_value,): (i8,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(i8,)>()?; +while let Some((tinyint_value,)) = iter.next().transpose()? { + println!("{:?}", tinyint_value); } # Ok(()) # } ``` ### Smallint + `Smallint` is represented as rust `i16` ```rust @@ -69,16 +72,17 @@ session .await?; // Read a smallint from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(i16,)>() { - let (smallint_value,): (i16,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(i16,)>()?; +while let Some((smallint_value,)) = iter.next().transpose()? { + println!("{}", smallint_value); } # Ok(()) # } ``` ### Int + `Int` is represented as rust `i32` ```rust @@ -95,16 +99,17 @@ session .await?; // Read an int from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(i32,)>() { - let (int_value,): (i32,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(i32,)>()?; +while let Some((int_value,)) = iter.next().transpose()? { + println!("{}", int_value); } # Ok(()) # } ``` ### Bigint + `Bigint` is represented as rust `i64` ```rust @@ -121,16 +126,17 @@ session .await?; // Read a bigint from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(i64,)>() { - let (bigint_value,): (i64,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(i64,)>()?; +while let Some((bigint_value,)) = iter.next().transpose()? { + println!("{:?}", bigint_value); } # Ok(()) # } ``` -### Float +### Float + `Float` is represented as rust `f32` ```rust @@ -147,16 +153,17 @@ session .await?; // Read a float from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(f32,)>() { - let (float_value,): (f32,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(f32,)>()?; +while let Some((float_value,)) = iter.next().transpose()? { + println!("{:?}", float_value); } # Ok(()) # } ``` ### Double + `Double` is represented as rust `f64` ```rust @@ -173,11 +180,11 @@ session .await?; // Read a double from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(f64,)>() { - let (double_value,): (f64,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(f64,)>()?; +while let Some((double_value,)) = iter.next().transpose()? { + println!("{:?}", double_value); } # Ok(()) # } -``` \ No newline at end of file +``` diff --git a/docs/source/data-types/text.md b/docs/source/data-types/text.md index 68479d233f..6d8fbf2b37 100644 --- a/docs/source/data-types/text.md +++ b/docs/source/data-types/text.md @@ -21,10 +21,10 @@ session .await?; // Read ascii/text/varchar from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(String,)>() { - let (text_value,): (String,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(String,)>()?; +while let Some((text_value,)) = iter.next().transpose()? { + println!("{}", text_value); } # Ok(()) # } diff --git a/docs/source/data-types/time.md b/docs/source/data-types/time.md index 3faddaf7a3..588a1f6c2e 100644 --- a/docs/source/data-types/time.md +++ b/docs/source/data-types/time.md @@ -67,14 +67,10 @@ session .await?; // Read time from the table -if let Some(rows) = session - .query("SELECT a FROM keyspace.table", &[]) - .await? - .rows -{ - for row in rows.into_typed::<(NaiveTime,)>() { - let (time_value,): (NaiveTime,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(NaiveTime,)>()?; +while let Some((time_value,)) = iter.next().transpose()? { + println!("{:?}", time_value); } # Ok(()) # } @@ -103,14 +99,10 @@ session .await?; // Read time from the table -if let Some(rows) = session - .query("SELECT a FROM keyspace.table", &[]) - .await? - .rows -{ - for row in rows.into_typed::<(Time,)>() { - let (time_value,): (Time,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(Time,)>()?; +while let Some((time_value,)) = iter.next().transpose()? { + println!("{:?}", time_value); } # Ok(()) # } diff --git a/docs/source/data-types/timestamp.md b/docs/source/data-types/timestamp.md index 1be843754d..9a9e3e4754 100644 --- a/docs/source/data-types/timestamp.md +++ b/docs/source/data-types/timestamp.md @@ -72,14 +72,10 @@ session .await?; // Read timestamp from the table -if let Some(rows) = session - .query("SELECT a FROM keyspace.table", &[]) - .await? - .rows -{ - for row in rows.into_typed::<(DateTime,)>() { - let (timestamp_value,): (DateTime,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(DateTime,)>()?; +while let Some((timestamp_value,)) = iter.next().transpose()? { + println!("{:?}", timestamp_value); } # Ok(()) # } @@ -115,14 +111,10 @@ session .await?; // Read timestamp from the table -if let Some(rows) = session - .query("SELECT a FROM keyspace.table", &[]) - .await? - .rows -{ - for row in rows.into_typed::<(OffsetDateTime,)>() { - let (timestamp_value,): (OffsetDateTime,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(OffsetDateTime,)>()?; +while let Some((timestamp_value,)) = iter.next().transpose()? { + println!("{:?}", timestamp_value); } # Ok(()) # } diff --git a/docs/source/data-types/tuple.md b/docs/source/data-types/tuple.md index 74a41de947..c56c814913 100644 --- a/docs/source/data-types/tuple.md +++ b/docs/source/data-types/tuple.md @@ -1,4 +1,5 @@ # Tuple + `Tuple` is represented as rust tuples of max 16 elements. ```rust @@ -15,14 +16,13 @@ session .await?; // Read a tuple of int and string from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<((i32, String),)>() { - let (tuple_value,): ((i32, String),) = row?; - - let int_value: i32 = tuple_value.0; - let string_value: String = tuple_value.1; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<((i32, String),)>()?; +while let Some((tuple_value,)) = iter.next().transpose()? { + let int_value: i32 = tuple_value.0; + let string_value: String = tuple_value.1; + println!("({}, {})", int_value, string_value); } # Ok(()) # } -``` \ No newline at end of file +``` diff --git a/docs/source/data-types/udt.md b/docs/source/data-types/udt.md index ac5b134a62..c2ed650738 100644 --- a/docs/source/data-types/udt.md +++ b/docs/source/data-types/udt.md @@ -70,10 +70,10 @@ session .await?; // Read MyType from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(MyType,)>() { - let (my_type_value,): (MyType,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(MyType,)>()?; +while let Some((my_type_value,)) = iter.next().transpose()? { + println!("{:?}", my_type_value); } # Ok(()) # } diff --git a/docs/source/data-types/uuid.md b/docs/source/data-types/uuid.md index 84ab1c2d1a..0e63b9ca71 100644 --- a/docs/source/data-types/uuid.md +++ b/docs/source/data-types/uuid.md @@ -18,10 +18,10 @@ session .await?; // Read uuid from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(Uuid,)>() { - let (uuid_value,): (Uuid,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(Uuid,)>()?; +while let Some((uuid_value,)) = iter.next().transpose()? { + println!("{:?}", uuid_value); } # Ok(()) # } diff --git a/docs/source/data-types/varint.md b/docs/source/data-types/varint.md index 745520e66f..87b0f7e1d0 100644 --- a/docs/source/data-types/varint.md +++ b/docs/source/data-types/varint.md @@ -29,10 +29,10 @@ session .await?; // Read a varint from the table -if let Some(rows) = session.query("SELECT a FROM keyspace.table", &[]).await?.rows { - for row in rows.into_typed::<(BigInt,)>() { - let (varint_value,): (BigInt,) = row?; - } +let result = session.query("SELECT a FROM keyspace.table", &[]).await?; +let mut iter = result.rows_typed::<(BigInt,)>()?; +while let Some((varint_value,)) = iter.next().transpose()? { + println!("{:?}", varint_value); } # Ok(()) # } diff --git a/docs/source/queries/simple.md b/docs/source/queries/simple.md index 5b668013a1..ed852611fb 100644 --- a/docs/source/queries/simple.md +++ b/docs/source/queries/simple.md @@ -84,12 +84,10 @@ Each row can be parsed as a tuple of rust types using `into_typed`: use scylla::IntoTypedRows; // Query rows from the table and print them -if let Some(rows) = session.query("SELECT a FROM ks.tab", &[]).await?.rows { - // Parse each row as a tuple containing single i32 - for row in rows.into_typed::<(i32,)>() { - let read_row: (i32,) = row?; - println!("Read a value from row: {}", read_row.0); - } +let result = session.query("SELECT a FROM ks.tab", &[]).await?; +let mut iter = result.rows_typed::<(i32,)>()?; +while let Some(read_row) = iter.next().transpose()? { + println!("Read a value from row: {}", read_row.0); } # Ok(()) # } diff --git a/docs/source/quickstart/example.md b/docs/source/quickstart/example.md index 19138b947a..32e01b2d5f 100644 --- a/docs/source/quickstart/example.md +++ b/docs/source/quickstart/example.md @@ -43,12 +43,10 @@ async fn main() -> Result<(), Box> { .await?; // Query rows from the table and print them - if let Some(rows) = session.query("SELECT a FROM ks.extab", &[]).await?.rows { - // Parse each row as a tuple containing single i32 - for row in rows.into_typed::<(i32,)>() { - let read_row: (i32,) = row?; - println!("Read a value from row: {}", read_row.0); - } + let result = session.query("SELECT a FROM ks.extab", &[]).await?; + let mut iter = result.rows_typed::<(i32,)>()?; + while let Some(read_row) = iter.next().transpose()? { + println!("Read a value from row: {}", read_row.0); } Ok(()) diff --git a/examples/basic.rs b/examples/basic.rs index ecae95068b..70c405b825 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -1,6 +1,6 @@ use anyhow::Result; use scylla::macros::FromRow; -use scylla::transport::session::{IntoTypedRows, Session}; +use scylla::transport::session::Session; use scylla::SessionBuilder; use std::env; @@ -49,15 +49,12 @@ async fn main() -> Result<()> { .await?; // Rows can be parsed as tuples - if let Some(rows) = session + let result = session .query("SELECT a, b, c FROM examples_ks.basic", &[]) - .await? - .rows - { - for row in rows.into_typed::<(i32, i32, String)>() { - let (a, b, c) = row?; - println!("a, b, c: {}, {}, {}", a, b, c); - } + .await?; + let mut iter = result.rows_typed::<(i32, i32, String)>()?; + while let Some((a, b, c)) = iter.next().transpose()? { + println!("a, b, c: {}, {}, {}", a, b, c); } // Or as custom structs that derive FromRow @@ -68,32 +65,12 @@ async fn main() -> Result<()> { _c: String, } - if let Some(rows) = session - .query("SELECT a, b, c FROM examples_ks.basic", &[]) - .await? - .rows - { - for row_data in rows.into_typed::() { - let row_data = row_data?; - println!("row_data: {:?}", row_data); - } - } - - // Or simply as untyped rows - if let Some(rows) = session + let result = session .query("SELECT a, b, c FROM examples_ks.basic", &[]) - .await? - .rows - { - for row in rows { - let a = row.columns[0].as_ref().unwrap().as_int().unwrap(); - let b = row.columns[1].as_ref().unwrap().as_int().unwrap(); - let c = row.columns[2].as_ref().unwrap().as_text().unwrap(); - println!("a, b, c: {}, {}, {}", a, b, c); - - // Alternatively each row can be parsed individually - // let (a2, b2, c2) = row.into_typed::<(i32, i32, String)>() ?; - } + .await?; + let mut iter = result.rows_typed::<(i32, i32, String)>()?; + while let Some(row_data) = iter.next().transpose()? { + println!("row_data: {:?}", row_data); } let metrics = session.get_metrics(); diff --git a/examples/compare-tokens.rs b/examples/compare-tokens.rs index 294dc7842b..1b1a66afc9 100644 --- a/examples/compare-tokens.rs +++ b/examples/compare-tokens.rs @@ -45,24 +45,13 @@ async fn main() -> Result<()> { .collect::>() ); - let qt = session + let (qt,) = session .query( - format!( - "SELECT token(pk) FROM examples_ks.compare_tokens where pk = {}", - pk - ), - &[], + "SELECT token(pk) FROM examples_ks.compare_tokens where pk = ?", + (pk,), ) .await? - .rows - .unwrap() - .first() - .expect("token query no rows!") - .columns[0] - .as_ref() - .expect("token query null value!") - .as_bigint() - .expect("token wrong type!"); + .single_row_typed::<(i64,)>()?; assert_eq!(t, qt); println!("token for {}: {}", pk, t); } diff --git a/examples/cql-time-types.rs b/examples/cql-time-types.rs index fed03f3949..93a6a372eb 100644 --- a/examples/cql-time-types.rs +++ b/examples/cql-time-types.rs @@ -5,7 +5,7 @@ use anyhow::Result; use chrono::{DateTime, NaiveDate, NaiveTime, Utc}; use scylla::frame::response::result::CqlValue; use scylla::frame::value::{CqlDate, CqlTime, CqlTimestamp}; -use scylla::transport::session::{IntoTypedRows, Session}; +use scylla::transport::session::Session; use scylla::SessionBuilder; use std::env; @@ -40,19 +40,16 @@ async fn main() -> Result<()> { ) .await?; - if let Some(rows) = session + let result = session .query("SELECT d from examples_ks.dates", &[]) - .await? - .rows - { - for row in rows.into_typed::<(NaiveDate,)>() { - let (read_date,): (NaiveDate,) = match row { - Ok(read_date) => read_date, - Err(_) => continue, // We might read a date that does not fit in NaiveDate, skip it - }; - - println!("Parsed a date into chrono::NaiveDate: {:?}", read_date); - } + .await?; + for row in result.rows_typed::<(NaiveDate,)>()? { + let (read_date,): (NaiveDate,) = match row { + Ok(read_date) => read_date, + Err(_) => continue, // We might read a date that does not fit in NaiveDate, skip it + }; + + println!("Parsed a date into chrono::NaiveDate: {:?}", read_date); } // Alternatively, you can enable 'time' feature and use `time::Date` to represent date. `time::Date` only allows @@ -64,19 +61,16 @@ async fn main() -> Result<()> { .query("INSERT INTO examples_ks.dates (d) VALUES (?)", (time_date,)) .await?; - if let Some(rows) = session + let result = session .query("SELECT d from examples_ks.dates", &[]) - .await? - .rows - { - for row in rows.into_typed::<(time::Date,)>() { - let (read_date,) = match row { - Ok(read_date) => read_date, - Err(_) => continue, // We might read a date that does not fit in time::Date, skip it - }; - - println!("Parsed a date into time::Date: {:?}", read_date); - } + .await?; + for row in result.rows_typed::<(time::Date,)>()? { + let (read_date,) = match row { + Ok(read_date) => read_date, + Err(_) => continue, // We might read a date that does not fit in time::Date, skip it + }; + + println!("Parsed a date into time::Date: {:?}", read_date); } // Dates outside this range must be represented in the raw form - an u32 describing days since -5877641-06-23 @@ -88,19 +82,17 @@ async fn main() -> Result<()> { ) .await?; - if let Some(rows) = session + let result = session .query("SELECT d from examples_ks.dates", &[]) - .await? - .rows - { - for row in rows { - let read_days: u32 = match row.columns[0] { - Some(CqlValue::Date(CqlDate(days))) => days, - _ => panic!("oh no"), - }; - - println!("Read a date as raw days: {}", read_days); - } + .await?; + let mut iter = result.rows_typed::<(CqlValue,)>()?; + while let Some((value,)) = iter.next().transpose()? { + let read_days: u32 = match value { + CqlValue::Date(CqlDate(days)) => days, + _ => panic!("oh no"), + }; + + println!("Read a date as raw days: {}", read_days); } // Time @@ -126,16 +118,12 @@ async fn main() -> Result<()> { ) .await?; - if let Some(rows) = session + let result = session .query("SELECT t from examples_ks.times", &[]) - .await? - .rows - { - for row in rows.into_typed::<(NaiveTime,)>() { - let (read_time,) = row?; - - println!("Parsed a time into chrono::NaiveTime: {:?}", read_time); - } + .await?; + let mut iter = result.rows_typed::<(NaiveTime,)>()?; + while let Some((read_time,)) = iter.next().transpose()? { + println!("Parsed a time into chrono::NaiveTime: {:?}", read_time); } // time::Time @@ -145,16 +133,12 @@ async fn main() -> Result<()> { .query("INSERT INTO examples_ks.times (t) VALUES (?)", (time_time,)) .await?; - if let Some(rows) = session + let result = session .query("SELECT t from examples_ks.times", &[]) - .await? - .rows - { - for row in rows.into_typed::<(time::Time,)>() { - let (read_time,) = row?; - - println!("Parsed a time into time::Time: {:?}", read_time); - } + .await?; + let mut iter = result.rows_typed::<(time::Time,)>()?; + while let Some((read_time,)) = iter.next().transpose()? { + println!("Parsed a time into time::Time: {:?}", read_time); } // CqlTime @@ -164,16 +148,12 @@ async fn main() -> Result<()> { .query("INSERT INTO examples_ks.times (t) VALUES (?)", (time_time,)) .await?; - if let Some(rows) = session + let result = session .query("SELECT t from examples_ks.times", &[]) - .await? - .rows - { - for row in rows.into_typed::<(CqlTime,)>() { - let (read_time,) = row?; - - println!("Read a time as raw nanos: {:?}", read_time); - } + .await?; + let mut iter = result.rows_typed::<(CqlTime,)>()?; + while let Some((read_time,)) = iter.next().transpose()? { + println!("Read a time as raw nanos: {:?}", read_time); } // Timestamp @@ -199,19 +179,15 @@ async fn main() -> Result<()> { ) .await?; - if let Some(rows) = session + let result = session .query("SELECT t from examples_ks.timestamps", &[]) - .await? - .rows - { - for row in rows.into_typed::<(DateTime,)>() { - let (read_time,) = row?; - - println!( - "Parsed a timestamp into chrono::DateTime: {:?}", - read_time - ); - } + .await?; + let mut iter = result.rows_typed::<(DateTime,)>()?; + while let Some((read_time,)) = iter.next().transpose()? { + println!( + "Parsed a timestamp into chrono::DateTime: {:?}", + read_time + ); } // time::OffsetDateTime @@ -224,19 +200,15 @@ async fn main() -> Result<()> { ) .await?; - if let Some(rows) = session + let result = session .query("SELECT t from examples_ks.timestamps", &[]) - .await? - .rows - { - for row in rows.into_typed::<(time::OffsetDateTime,)>() { - let (read_time,) = row?; - - println!( - "Parsed a timestamp into time::OffsetDateTime: {:?}", - read_time - ); - } + .await?; + let mut iter = result.rows_typed::<(time::OffsetDateTime,)>()?; + while let Some((read_time,)) = iter.next().transpose()? { + println!( + "Parsed a timestamp into time::OffsetDateTime: {:?}", + read_time + ); } // CqlTimestamp @@ -249,16 +221,12 @@ async fn main() -> Result<()> { ) .await?; - if let Some(rows) = session + let result = session .query("SELECT t from examples_ks.timestamps", &[]) - .await? - .rows - { - for row in rows.into_typed::<(CqlTimestamp,)>() { - let (read_time,) = row?; - - println!("Read a timestamp as raw millis: {:?}", read_time); - } + .await?; + let mut iter = result.rows_typed::<(CqlTimestamp,)>()?; + while let Some((read_time,)) = iter.next().transpose()? { + println!("Read a timestamp as raw millis: {:?}", read_time); } Ok(()) diff --git a/examples/schema_agreement.rs b/examples/schema_agreement.rs index 2f7189c1f1..ed12af95c8 100644 --- a/examples/schema_agreement.rs +++ b/examples/schema_agreement.rs @@ -1,6 +1,6 @@ use anyhow::{bail, Result}; use scylla::transport::errors::QueryError; -use scylla::transport::session::{IntoTypedRows, Session}; +use scylla::transport::session::Session; use scylla::SessionBuilder; use std::env; use std::time::Duration; @@ -66,16 +66,14 @@ async fn main() -> Result<()> { .await?; // Rows can be parsed as tuples - if let Some(rows) = session + let result = session .query("SELECT a, b, c FROM examples_ks.schema_agreement", &[]) - .await? - .rows - { - for row in rows.into_typed::<(i32, i32, String)>() { - let (a, b, c) = row?; - println!("a, b, c: {}, {}, {}", a, b, c); - } + .await?; + let mut iter = result.rows_typed::<(i32, i32, String)>()?; + while let Some((a, b, c)) = iter.next().transpose()? { + println!("a, b, c: {}, {}, {}", a, b, c); } + println!("Ok."); let schema_version = session.await_schema_agreement().await?; diff --git a/examples/select-paging.rs b/examples/select-paging.rs index 8cee4742cb..ea6cb256e0 100644 --- a/examples/select-paging.rs +++ b/examples/select-paging.rs @@ -45,7 +45,7 @@ async fn main() -> Result<()> { println!( "Paging state: {:#?} ({} rows)", res1.paging_state, - res1.rows.unwrap().len() + res1.rows_num()?, ); let res2 = session .query_paged(paged_query.clone(), &[], res1.paging_state) @@ -53,7 +53,7 @@ async fn main() -> Result<()> { println!( "Paging state: {:#?} ({} rows)", res2.paging_state, - res2.rows.unwrap().len() + res2.rows_num()?, ); let res3 = session .query_paged(paged_query.clone(), &[], res2.paging_state) @@ -61,7 +61,7 @@ async fn main() -> Result<()> { println!( "Paging state: {:#?} ({} rows)", res3.paging_state, - res3.rows.unwrap().len() + res3.rows_num()?, ); let paged_prepared = session @@ -71,7 +71,7 @@ async fn main() -> Result<()> { println!( "Paging state from the prepared statement execution: {:#?} ({} rows)", res4.paging_state, - res4.rows.unwrap().len() + res4.rows_num()?, ); let res5 = session .execute_paged(&paged_prepared, &[], res4.paging_state) @@ -79,7 +79,7 @@ async fn main() -> Result<()> { println!( "Paging state from the second prepared statement execution: {:#?} ({} rows)", res5.paging_state, - res5.rows.unwrap().len() + res5.rows_num()?, ); let res6 = session .execute_paged(&paged_prepared, &[], res5.paging_state) @@ -87,7 +87,7 @@ async fn main() -> Result<()> { println!( "Paging state from the third prepared statement execution: {:#?} ({} rows)", res6.paging_state, - res6.rows.unwrap().len() + res6.rows_num()?, ); println!("Ok."); diff --git a/examples/tls.rs b/examples/tls.rs index 4ecfc62fa4..b54296bae5 100644 --- a/examples/tls.rs +++ b/examples/tls.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use scylla::transport::session::{IntoTypedRows, Session}; +use scylla::transport::session::Session; use scylla::SessionBuilder; use std::env; use std::fs; @@ -86,16 +86,14 @@ async fn main() -> Result<()> { .await?; // Rows can be parsed as tuples - if let Some(rows) = session + let result = session .query("SELECT a, b, c FROM examples_ks.tls", &[]) - .await? - .rows - { - for row in rows.into_typed::<(i32, i32, String)>() { - let (a, b, c) = row?; - println!("a, b, c: {}, {}, {}", a, b, c); - } + .await?; + let mut iter = result.rows_typed::<(i32, i32, String)>()?; + while let Some((a, b, c)) = iter.next().transpose()? { + println!("a, b, c: {}, {}, {}", a, b, c); } + println!("Ok."); Ok(()) diff --git a/examples/user-defined-type.rs b/examples/user-defined-type.rs index 4ed2304a6b..40ec38685e 100644 --- a/examples/user-defined-type.rs +++ b/examples/user-defined-type.rs @@ -1,6 +1,6 @@ use anyhow::Result; use scylla::macros::FromUserType; -use scylla::{IntoTypedRows, SerializeCql, Session, SessionBuilder}; +use scylla::{SerializeCql, Session, SessionBuilder}; use std::env; #[tokio::main] @@ -49,15 +49,12 @@ async fn main() -> Result<()> { .await?; // And read like any normal value - if let Some(rows) = session + let result = session .query("SELECT my FROM examples_ks.user_defined_type_table", &[]) - .await? - .rows - { - for row in rows.into_typed::<(MyType,)>() { - let (my_type_value,): (MyType,) = row?; - println!("{:?}", my_type_value) - } + .await?; + let mut iter = result.rows_typed::<(MyType,)>()?; + while let Some((my_val,)) = iter.next().transpose()? { + println!("{:?}", my_val); } println!("Ok."); diff --git a/scylla/src/transport/caching_session.rs b/scylla/src/transport/caching_session.rs index 464286602b..99b910985d 100644 --- a/scylla/src/transport/caching_session.rs +++ b/scylla/src/transport/caching_session.rs @@ -322,7 +322,7 @@ mod tests { .unwrap(); assert_eq!(1, session.cache.len()); - assert_eq!(1, result.rows.unwrap().len()); + assert_eq!(1, result.rows_num().unwrap()); let result = session .execute("select * from test_table", &[]) @@ -330,7 +330,7 @@ mod tests { .unwrap(); assert_eq!(1, session.cache.len()); - assert_eq!(1, result.rows.unwrap().len()); + assert_eq!(1, result.rows_num().unwrap()); } /// Checks that caching works with execute_iter @@ -364,7 +364,7 @@ mod tests { .unwrap(); assert_eq!(1, session.cache.len()); - assert_eq!(1, result.rows.unwrap().len()); + assert_eq!(1, result.rows_num().unwrap()); } async fn assert_test_batch_table_rows_contain( diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index 6d8af2aeb5..6c1e2bfcf3 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -43,6 +43,7 @@ use std::{ use super::errors::{BadKeyspaceName, DbError, QueryError}; use super::iterator::RowIterator; +use super::query_result::SingleRowTypedError; use super::session::AddressTranslator; use super::topology::{PeerEndpoint, UntranslatedEndpoint, UntranslatedPeer}; use super::NodeAddr; @@ -62,7 +63,6 @@ use crate::query::Query; use crate::routing::ShardInfo; use crate::statement::prepared_statement::PreparedStatement; use crate::statement::Consistency; -use crate::transport::session::IntoTypedRows; use crate::transport::Compression; use crate::QueryResult; @@ -963,12 +963,18 @@ impl Connection { let (version_id,): (Uuid,) = self .query_single_page(LOCAL_VERSION) .await? - .rows - .ok_or(QueryError::ProtocolError("Version query returned not rows"))? - .into_typed::<(Uuid,)>() - .next() - .ok_or(QueryError::ProtocolError("Admin table returned empty rows"))? - .map_err(|_| QueryError::ProtocolError("Row is not uuid type as it should be"))?; + .single_row_typed() + .map_err(|err| match err { + SingleRowTypedError::RowsExpected(_) => { + QueryError::ProtocolError("Version query returned not rows") + } + SingleRowTypedError::BadNumberOfRows(_) => { + QueryError::ProtocolError("system.local query returned a wrong number of rows") + } + SingleRowTypedError::FromRowError(_) => { + QueryError::ProtocolError("Row is not uuid type as it should be") + } + })?; Ok(version_id) } diff --git a/scylla/src/transport/cql_collections_test.rs b/scylla/src/transport/cql_collections_test.rs index a3e8c76089..9986880835 100644 --- a/scylla/src/transport/cql_collections_test.rs +++ b/scylla/src/transport/cql_collections_test.rs @@ -1,7 +1,7 @@ use crate::cql_to_rust::FromCqlVal; use crate::test_utils::create_new_session_builder; use crate::utils::test_utils::unique_keyspace_name; -use crate::{frame::response::result::CqlValue, IntoTypedRows, Session}; +use crate::{frame::response::result::CqlValue, Session}; use scylla_cql::types::serialize::value::SerializeCql; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; @@ -48,11 +48,7 @@ async fn insert_and_select( .query(format!("SELECT val FROM {} WHERE p = 0", table_name), ()) .await .unwrap() - .rows - .unwrap() - .into_typed::<(SelectT,)>() - .next() - .unwrap() + .single_row_typed::<(SelectT,)>() .unwrap() .0; diff --git a/scylla/src/transport/cql_types_test.rs b/scylla/src/transport/cql_types_test.rs index 34fff67a67..de8fbf3259 100644 --- a/scylla/src/transport/cql_types_test.rs +++ b/scylla/src/transport/cql_types_test.rs @@ -4,7 +4,6 @@ use crate::frame::response::result::CqlValue; use crate::frame::value::{Counter, CqlDate, CqlTime, CqlTimestamp}; use crate::macros::FromUserType; use crate::test_utils::create_new_session_builder; -use crate::transport::session::IntoTypedRows; use crate::transport::session::Session; use crate::utils::test_utils::unique_keyspace_name; use itertools::Itertools; @@ -90,9 +89,8 @@ where .query(select_values, &[]) .await .unwrap() - .rows + .rows_typed::<(T,)>() .unwrap() - .into_typed::<(T,)>() .map(Result::unwrap) .map(|row| row.0) .collect::>(); @@ -206,9 +204,8 @@ async fn test_cql_varint() { .execute(&prepared_select, &[]) .await .unwrap() - .rows + .rows_typed::<(CqlVarint,)>() .unwrap() - .into_typed::<(CqlVarint,)>() .map(Result::unwrap) .map(|row| row.0) .collect::>(); @@ -278,9 +275,8 @@ async fn test_counter() { .query(select_values, (i as i32,)) .await .unwrap() - .rows + .rows_typed::<(Counter,)>() .unwrap() - .into_typed::<(Counter,)>() .map(Result::unwrap) .map(|row| row.0) .collect::>(); @@ -354,9 +350,8 @@ async fn test_naive_date() { .query("SELECT val from chrono_naive_date_tests", &[]) .await .unwrap() - .rows + .rows_typed::<(NaiveDate,)>() .unwrap() - .into_typed::<(NaiveDate,)>() .next() .unwrap() .ok() @@ -378,11 +373,7 @@ async fn test_naive_date() { .query("SELECT val from chrono_naive_date_tests", &[]) .await .unwrap() - .rows - .unwrap() - .into_typed::<(NaiveDate,)>() - .next() - .unwrap() + .single_row_typed::<(NaiveDate,)>() .unwrap(); assert_eq!(read_date, *naive_date); } @@ -568,11 +559,7 @@ async fn test_cql_time() { .query("SELECT val from cql_time_tests", &[]) .await .unwrap() - .rows - .unwrap() - .into_typed::<(CqlTime,)>() - .next() - .unwrap() + .single_row_typed::<(CqlTime,)>() .unwrap(); assert_eq!(read_time, *time_duration); @@ -590,11 +577,7 @@ async fn test_cql_time() { .query("SELECT val from cql_time_tests", &[]) .await .unwrap() - .rows - .unwrap() - .into_typed::<(CqlTime,)>() - .next() - .unwrap() + .single_row_typed::<(CqlTime,)>() .unwrap(); assert_eq!(read_time, *time_duration); @@ -820,11 +803,7 @@ async fn test_cql_timestamp() { .query("SELECT val from cql_timestamp_tests", &[]) .await .unwrap() - .rows - .unwrap() - .into_typed::<(CqlTimestamp,)>() - .next() - .unwrap() + .single_row_typed::<(CqlTimestamp,)>() .unwrap(); assert_eq!(read_timestamp, *timestamp_duration); @@ -842,11 +821,7 @@ async fn test_cql_timestamp() { .query("SELECT val from cql_timestamp_tests", &[]) .await .unwrap() - .rows - .unwrap() - .into_typed::<(CqlTimestamp,)>() - .next() - .unwrap() + .single_row_typed::<(CqlTimestamp,)>() .unwrap(); assert_eq!(read_timestamp, *timestamp_duration); @@ -1202,11 +1177,7 @@ async fn test_timeuuid() { .query("SELECT val from timeuuid_tests", &[]) .await .unwrap() - .rows - .unwrap() - .into_typed::<(CqlTimeuuid,)>() - .next() - .unwrap() + .single_row_typed::<(CqlTimeuuid,)>() .unwrap(); assert_eq!(read_timeuuid.as_bytes(), timeuuid_bytes); @@ -1225,11 +1196,7 @@ async fn test_timeuuid() { .query("SELECT val from timeuuid_tests", &[]) .await .unwrap() - .rows - .unwrap() - .into_typed::<(CqlTimeuuid,)>() - .next() - .unwrap() + .single_row_typed::<(CqlTimeuuid,)>() .unwrap(); assert_eq!(read_timeuuid.as_bytes(), timeuuid_bytes); @@ -1372,11 +1339,7 @@ async fn test_inet() { .query("SELECT val from inet_tests WHERE id = 0", &[]) .await .unwrap() - .rows - .unwrap() - .into_typed::<(IpAddr,)>() - .next() - .unwrap() + .single_row_typed::<(IpAddr,)>() .unwrap(); assert_eq!(read_inet, *inet); @@ -1391,11 +1354,7 @@ async fn test_inet() { .query("SELECT val from inet_tests WHERE id = 0", &[]) .await .unwrap() - .rows - .unwrap() - .into_typed::<(IpAddr,)>() - .next() - .unwrap() + .single_row_typed::<(IpAddr,)>() .unwrap(); assert_eq!(read_inet, *inet); @@ -1445,11 +1404,7 @@ async fn test_blob() { .query("SELECT val from blob_tests WHERE id = 0", &[]) .await .unwrap() - .rows - .unwrap() - .into_typed::<(Vec,)>() - .next() - .unwrap() + .single_row_typed::<(Vec,)>() .unwrap(); assert_eq!(read_blob, *blob); @@ -1464,11 +1419,7 @@ async fn test_blob() { .query("SELECT val from blob_tests WHERE id = 0", &[]) .await .unwrap() - .rows - .unwrap() - .into_typed::<(Vec,)>() - .next() - .unwrap() + .single_row_typed::<(Vec,)>() .unwrap(); assert_eq!(read_blob, *blob); @@ -1555,11 +1506,7 @@ async fn test_udt_after_schema_update() { .query(format!("SELECT val from {} WHERE id = 0", table_name), &[]) .await .unwrap() - .rows - .unwrap() - .into_typed::<(UdtV1,)>() - .next() - .unwrap() + .single_row_typed::<(UdtV1,)>() .unwrap(); assert_eq!(read_udt, v1); @@ -1576,11 +1523,7 @@ async fn test_udt_after_schema_update() { .query(format!("SELECT val from {} WHERE id = 0", table_name), &[]) .await .unwrap() - .rows - .unwrap() - .into_typed::<(UdtV1,)>() - .next() - .unwrap() + .single_row_typed::<(UdtV1,)>() .unwrap(); assert_eq!(read_udt, v1); @@ -1601,11 +1544,7 @@ async fn test_udt_after_schema_update() { .query(format!("SELECT val from {} WHERE id = 0", table_name), &[]) .await .unwrap() - .rows - .unwrap() - .into_typed::<(UdtV2,)>() - .next() - .unwrap() + .single_row_typed::<(UdtV2,)>() .unwrap(); assert_eq!( @@ -1736,11 +1675,7 @@ async fn test_udt_with_missing_field() { ) .await .unwrap() - .rows - .unwrap() - .into_typed::<(TR,)>() - .next() - .unwrap() + .single_row_typed::<(TR,)>() .unwrap() .0; assert_eq!(expected, result); diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index f4f5ab2365..93903afe8c 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -44,6 +44,7 @@ use super::execution_profile::{ExecutionProfile, ExecutionProfileHandle, Executi use super::node::CloudEndpoint; use super::node::KnownNode; use super::partitioner::PartitionerName; +use super::query_result::MaybeFirstRowTypedError; use super::topology::UntranslatedPeer; use super::NodeRef; use crate::cql_to_rust::FromRow; @@ -1464,30 +1465,26 @@ impl Session { )?; // Get tracing info - let tracing_info_row_res: Option> = traces_session_res - .rows - .ok_or(QueryError::ProtocolError( - "Response to system_traces.sessions query was not Rows", - ))? - .into_typed::() - .next(); - - let mut tracing_info: TracingInfo = match tracing_info_row_res { - Some(tracing_info_row_res) => tracing_info_row_res.map_err(|_| { - QueryError::ProtocolError( + let maybe_tracing_info: Option = traces_session_res + .maybe_first_row_typed() + .map_err(|err| match err { + MaybeFirstRowTypedError::RowsExpected(_) => QueryError::ProtocolError( + "Response to system_traces.sessions query was not Rows", + ), + MaybeFirstRowTypedError::FromRowError(_) => QueryError::ProtocolError( "Columns from system_traces.session have an unexpected type", - ) - })?, + ), + })?; + + let mut tracing_info = match maybe_tracing_info { None => return Ok(None), + Some(tracing_info) => tracing_info, }; // Get tracing events - let tracing_event_rows = traces_events_res - .rows - .ok_or(QueryError::ProtocolError( - "Response to system_traces.events query was not Rows", - ))? - .into_typed::(); + let tracing_event_rows = traces_events_res.rows_typed().map_err(|_| { + QueryError::ProtocolError("Response to system_traces.events query was not Rows") + })?; for event in tracing_event_rows { let tracing_event: TracingEvent = event.map_err(|_| { diff --git a/scylla/src/transport/session_test.rs b/scylla/src/transport/session_test.rs index b6c9c20ba4..7d551b0f84 100644 --- a/scylla/src/transport/session_test.rs +++ b/scylla/src/transport/session_test.rs @@ -22,7 +22,7 @@ use crate::utils::test_utils::{ use crate::CachingSession; use crate::ExecutionProfile; use crate::QueryResult; -use crate::{IntoTypedRows, Session, SessionBuilder}; +use crate::{Session, SessionBuilder}; use assert_matches::assert_matches; use bytes::Bytes; use futures::{FutureExt, StreamExt, TryStreamExt}; @@ -221,19 +221,13 @@ async fn test_prepared_statement() { // Verify that token calculation is compatible with Scylla { - let rs = session + let (value,): (i64,) = session .query(format!("SELECT token(a) FROM {}.t2", ks), &[]) .await .unwrap() - .rows + .single_row_typed() .unwrap(); - let token = Token { - value: rs.first().unwrap().columns[0] - .as_ref() - .unwrap() - .as_bigint() - .unwrap(), - }; + let token = Token { value }; let prepared_token = Murmur3Partitioner .hash_one(&prepared_statement.compute_partition_key(&values).unwrap()); assert_eq!(token, prepared_token); @@ -246,19 +240,13 @@ async fn test_prepared_statement() { assert_eq!(token, cluster_data_token); } { - let rs = session + let (value,): (i64,) = session .query(format!("SELECT token(a,b,c) FROM {}.complex_pk", ks), &[]) .await .unwrap() - .rows + .single_row_typed() .unwrap(); - let token = Token { - value: rs.first().unwrap().columns[0] - .as_ref() - .unwrap() - .as_bigint() - .unwrap(), - }; + let token = Token { value }; let prepared_token = Murmur3Partitioner.hash_one( &prepared_complex_pk_statement .compute_partition_key(&values) @@ -306,20 +294,17 @@ async fn test_prepared_statement() { assert_eq!(results_from_manual_paging, rs); } { - let rs = session + let (a, b, c, d, e): (i32, i32, String, i32, Option) = session .query(format!("SELECT a,b,c,d,e FROM {}.complex_pk", ks), &[]) .await .unwrap() - .rows + .single_row_typed() .unwrap(); - let r = rs.first().unwrap(); - let a = r.columns[0].as_ref().unwrap().as_int().unwrap(); - let b = r.columns[1].as_ref().unwrap().as_int().unwrap(); - let c = r.columns[2].as_ref().unwrap().as_text().unwrap(); - let d = r.columns[3].as_ref().unwrap().as_int().unwrap(); - let e = r.columns[4].as_ref(); assert!(e.is_none()); - assert_eq!((a, b, c, d), (17, 16, &String::from("I'm prepared!!!"), 7)) + assert_eq!( + (a, b, c.as_str(), d, e), + (17, 16, "I'm prepared!!!", 7, None) + ); } // Check that SerializeRow macro works { @@ -349,7 +334,7 @@ async fn test_prepared_statement() { ) .await .unwrap(); - let mut rs = session + let output: ComplexPk = session .query( format!( "SELECT * FROM {}.complex_pk WHERE a = 9 and b = 8 and c = 'seven'", @@ -359,10 +344,8 @@ async fn test_prepared_statement() { ) .await .unwrap() - .rows - .unwrap() - .into_typed::(); - let output = rs.next().unwrap().unwrap(); + .single_row_typed() + .unwrap(); assert_eq!(input, output) } } @@ -419,29 +402,22 @@ async fn test_batch() { .await .unwrap(); - let rs = session + let mut results: Vec<(i32, i32, String)> = session .query(format!("SELECT a, b, c FROM {}.t_batch", ks), &[]) .await .unwrap() - .rows + .rows_typed() + .unwrap() + .collect::>() .unwrap(); - let mut results: Vec<(i32, i32, &String)> = rs - .iter() - .map(|r| { - let a = r.columns[0].as_ref().unwrap().as_int().unwrap(); - let b = r.columns[1].as_ref().unwrap().as_int().unwrap(); - let c = r.columns[2].as_ref().unwrap().as_text().unwrap(); - (a, b, c) - }) - .collect(); results.sort(); assert_eq!( results, vec![ - (1, 2, &String::from("abc")), - (1, 4, &String::from("hello")), - (7, 11, &String::from("")) + (1, 2, String::from("abc")), + (1, 4, String::from("hello")), + (7, 11, String::from("")) ] ); @@ -460,26 +436,19 @@ async fn test_batch() { .unwrap(); session.batch(&batch, values).await.unwrap(); - let rs = session + let results: Vec<(i32, i32, String)> = session .query( format!("SELECT a, b, c FROM {}.t_batch WHERE a = 4", ks), &[], ) .await .unwrap() - .rows + .rows_typed() + .unwrap() + .collect::>() .unwrap(); - let results: Vec<(i32, i32, &String)> = rs - .iter() - .map(|r| { - let a = r.columns[0].as_ref().unwrap().as_int().unwrap(); - let b = r.columns[1].as_ref().unwrap().as_int().unwrap(); - let c = r.columns[2].as_ref().unwrap().as_text().unwrap(); - (a, b, c) - }) - .collect(); - assert_eq!(results, vec![(4, 20, &String::from("foobar"))]); + assert_eq!(results, vec![(4, 20, String::from("foobar"))]); } #[tokio::test] @@ -516,22 +485,16 @@ async fn test_token_calculation() { let serialized_values = prepared_statement.serialize_values(&values).unwrap(); session.execute(&prepared_statement, &values).await.unwrap(); - let rs = session + let (value,): (i64,) = session .query( format!("SELECT token(a) FROM {}.t3 WHERE a = ?", ks), &values, ) .await .unwrap() - .rows + .single_row_typed() .unwrap(); - let token = Token { - value: rs.first().unwrap().columns[0] - .as_ref() - .unwrap() - .as_bigint() - .unwrap(), - }; + let token = Token { value }; let prepared_token = Murmur3Partitioner .hash_one(&prepared_statement.compute_partition_key(&values).unwrap()); assert_eq!(token, prepared_token); @@ -622,9 +585,8 @@ async fn test_use_keyspace() { .query("SELECT * FROM tab", &[]) .await .unwrap() - .rows + .rows_typed::<(String,)>() .unwrap() - .into_typed::<(String,)>() .map(|res| res.unwrap().0) .collect(); @@ -672,9 +634,8 @@ async fn test_use_keyspace() { .query("SELECT * FROM tab", &[]) .await .unwrap() - .rows + .rows_typed::<(String,)>() .unwrap() - .into_typed::<(String,)>() .map(|res| res.unwrap().0) .collect(); @@ -732,9 +693,8 @@ async fn test_use_keyspace_case_sensitivity() { .query("SELECT * from tab", &[]) .await .unwrap() - .rows + .rows_typed::<(String,)>() .unwrap() - .into_typed::<(String,)>() .map(|row| row.unwrap().0) .collect(); @@ -748,9 +708,8 @@ async fn test_use_keyspace_case_sensitivity() { .query("SELECT * from tab", &[]) .await .unwrap() - .rows + .rows_typed::<(String,)>() .unwrap() - .into_typed::<(String,)>() .map(|row| row.unwrap().0) .collect(); @@ -789,9 +748,8 @@ async fn test_raw_use_keyspace() { .query("SELECT * FROM tab", &[]) .await .unwrap() - .rows + .rows_typed::<(String,)>() .unwrap() - .into_typed::<(String,)>() .map(|res| res.unwrap().0) .collect(); @@ -1084,15 +1042,14 @@ async fn assert_in_tracing_table(session: &Session, tracing_uuid: Uuid) { // If rows are empty perform 8 retries with a 32ms wait in between for _ in 0..8 { - let row_opt = session + let rows_num = session .query(traces_query.clone(), (tracing_uuid,)) .await .unwrap() - .rows - .into_iter() - .next(); + .rows_num() + .unwrap(); - if row_opt.is_some() { + if rows_num > 0 { // Ok there was some row for this tracing_uuid return; } @@ -1207,9 +1164,8 @@ async fn test_timestamp() { ) .await .unwrap() - .rows + .rows_typed::<(String, String, i64)>() .unwrap() - .into_typed::<(String, String, i64)>() .map(Result::unwrap) .collect::>(); results.sort(); @@ -1827,9 +1783,8 @@ async fn test_named_bind_markers() { .query("SELECT pk, ck, v FROM t", &[]) .await .unwrap() - .rows + .rows_typed::<(i32, i32, i32)>() .unwrap() - .into_typed::<(i32, i32, i32)>() .map(|res| res.unwrap()) .collect();