Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple table and column specs #956

Closed
wants to merge 42 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
4fbb315
result: reposition and comment some code
wprzytula Oct 11, 2024
3cf54f8
result: fix lifetimes in ColumnSpec
wprzytula Oct 22, 2024
7e77693
result: introduce RawRows & friends
piodul Mar 15, 2023
5a49730
result: introduce RawRowsLendingIterator
wprzytula Aug 14, 2024
5339d27
treewide: rename QueryResult -> LegacyQueryResult
piodul Mar 14, 2023
8020cfd
transport: introduce new QueryResult
piodul Mar 13, 2023
d7ce383
result: metadata serialization utils for tests
wprzytula Oct 16, 2024
0af797f
transport: add tests for new QueryResult
wprzytula Oct 16, 2024
f255648
session,iterator: no longer record rows size & count
wprzytula Oct 11, 2024
691619d
treewide: propagate RawRows/new QueryResult
piodul Mar 13, 2023
cae545c
iterator: rename (Typed)RowIterator to Legacy(Typed)RowIterator
piodul Mar 13, 2023
e2d791f
iterator: introduce poll_next_internal
piodul Feb 28, 2023
fd90577
iterator: introduce ready_some_ok! macro
piodul Mar 13, 2023
1fa8213
iterator: introduce poll_next_page
piodul Mar 13, 2023
920244e
iterator: reorder code for better grouping
wprzytula Aug 13, 2024
ab79c39
iterator: adjust to the new deserialization framework
piodul Mar 14, 2023
5282c44
iterator: typed API for new deserialization framework
wprzytula Oct 22, 2024
7ec3c8e
result: delete legacy Rows type
wprzytula Oct 11, 2024
1692a67
session,iterator: record raw metadata&rows size
wprzytula Oct 28, 2024
778c392
result: make ColumnSpec::borrowed `const`
wprzytula Oct 30, 2024
d59d593
WIP: yoked version
wprzytula Nov 4, 2024
10c7b98
FIX: QueryResult
wprzytula Nov 4, 2024
3001dce
treewide: rename Session to LegacySession
piodul Mar 15, 2023
9e19180
session: make generic and introduce "session kind" parameter
piodul Mar 16, 2023
83419ae
session: move query-related methods to a separate block
piodul Mar 16, 2023
f6f29d9
session: re-introduce the Session type as an alias
piodul Mar 16, 2023
01811c1
session_builder: rename build->build_legacy and then reintroduce
piodul Mar 16, 2023
1cc4a9a
tests: scylla_supports_tablets[_legacy] suffix
wprzytula Aug 8, 2024
6d7b080
session: de-genericise internal query/exec functions
piodul Mar 13, 2023
ee34db7
session: return new QueryResult from internal methods
piodul Mar 13, 2023
02f5237
session: add interface methods for the new deser API
piodul Mar 13, 2023
3549783
connection: switch to the new deserialization framework
piodul Mar 14, 2023
dd813de
caching_session: make generic over session APIs
piodul Mar 14, 2023
d3686d0
caching_session: modernize tests
piodul Mar 14, 2023
f5237ad
connection: migrate query_iter to new deserialization framework
wprzytula Mar 12, 2024
eb8b720
{session,tracing}: switch to the new deser framework for tracing info
piodul Mar 17, 2023
8877d37
treewide: switch tests to use the new framework
wprzytula Mar 12, 2024
4fb4922
examples: adjust to use the new interface
piodul Mar 14, 2023
a82fa6d
codewide: migrate doctests to new deser API
wprzytula Aug 14, 2024
a261658
session_test: regression test empty collections deserialization
wprzytula May 21, 2024
5561b17
treewide tests: remove needless vec![] allocations
wprzytula Oct 28, 2024
0409cbe
partial migration to TableSpec out of ColumnSpec
wprzytula Oct 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock.msrv

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ scylla = { path = "../scylla", features = [
"num-bigint-04",
"bigdecimal-04",
] }
scylla-cql = { path = "../scylla-cql" }
tokio = { version = "1.34", features = ["full"] }
tracing = { version = "0.1.25", features = ["log"] }
tracing-subscriber = { version = "0.3.14", features = ["env-filter"] }
Expand Down
3 changes: 2 additions & 1 deletion examples/allocations.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Result;
use scylla::{statement::prepared_statement::PreparedStatement, Session, SessionBuilder};
use scylla::transport::session::Session;
use scylla::{statement::prepared_statement::PreparedStatement, SessionBuilder};
use std::io::Write;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down
27 changes: 13 additions & 14 deletions examples/basic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Result;
use futures::TryStreamExt;
use scylla::macros::FromRow;
use scylla::frame::response::result::Row;
use scylla::transport::session::Session;
use scylla::DeserializeRow;
use scylla::SessionBuilder;
use std::env;

Expand Down Expand Up @@ -53,39 +53,38 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT a, b, c FROM examples_ks.basic", &[])
.await?
.into_typed::<(i32, i32, String)>();
.into_typed::<(i32, i32, String)>()?;
while let Some((a, b, c)) = iter.try_next().await? {
println!("a, b, c: {}, {}, {}", a, b, c);
}

// Or as custom structs that derive FromRow
#[derive(Debug, FromRow)]
// Or as custom structs that derive DeserializeRow
#[allow(unused)]
#[derive(Debug, DeserializeRow)]
struct RowData {
_a: i32,
_b: Option<i32>,
_c: String,
a: i32,
b: Option<i32>,
c: String,
}

let mut iter = session
.query_iter("SELECT a, b, c FROM examples_ks.basic", &[])
.await?
.into_typed::<RowData>();
.into_typed::<RowData>()?;
while let Some(row_data) = iter.try_next().await? {
println!("row_data: {:?}", row_data);
}

// Or simply as untyped rows
let mut iter = session
.query_iter("SELECT a, b, c FROM examples_ks.basic", &[])
.await?;
while let Some(row) = iter.try_next().await? {
.await?
.into_typed::<Row>()?;
while let Some(row) = iter.next().await.transpose()? {
let a = row.columns[0].as_ref().unwrap().as_int().unwrap();
let b = row.columns[1].as_ref().unwrap().as_int().unwrap();
let c = row.columns[2].as_ref().unwrap().as_text().unwrap();
println!("a, b, c: {}, {}, {}", a, b, c);

// Alternatively each row can be parsed individually
// let (a2, b2, c2) = row.into_typed::<(i32, i32, String)>() ?;
}

let metrics = session.get_metrics();
Expand Down
4 changes: 3 additions & 1 deletion examples/compare-tokens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ async fn main() -> Result<()> {
(pk,),
)
.await?
.single_row_typed::<(i64,)>()?;
.into_rows_result()?
.expect("Got not Rows result")
.single_row()?;
assert_eq!(t, qt);
println!("token for {}: {}", pk, t);
}
Expand Down
19 changes: 9 additions & 10 deletions examples/cql-time-types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use anyhow::Result;
use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
use futures::{StreamExt, TryStreamExt};
use scylla::frame::response::result::CqlValue;
use scylla::frame::value::{CqlDate, CqlTime, CqlTimestamp};
use scylla::transport::session::Session;
Expand Down Expand Up @@ -44,7 +43,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.dates", &[])
.await?
.into_typed::<(NaiveDate,)>();
.into_typed::<(NaiveDate,)>()?;
while let Some(row_result) = iter.next().await {
let (read_date,): (NaiveDate,) = match row_result {
Ok(read_date) => read_date,
Expand All @@ -66,7 +65,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.dates", &[])
.await?
.into_typed::<(time::Date,)>();
.into_typed::<(time::Date,)>()?;
while let Some(row_result) = iter.next().await {
let (read_date,): (time::Date,) = match row_result {
Ok(read_date) => read_date,
Expand All @@ -88,7 +87,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.dates", &[])
.await?
.into_typed::<(CqlValue,)>();
.into_typed::<(CqlValue,)>()?;
while let Some(row_result) = iter.next().await {
let read_days: u32 = match row_result {
Ok((CqlValue::Date(CqlDate(days)),)) => days,
Expand Down Expand Up @@ -124,7 +123,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.times", &[])
.await?
.into_typed::<(NaiveTime,)>();
.into_typed::<(NaiveTime,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!("Parsed a time into chrono::NaiveTime: {:?}", read_time);
}
Expand All @@ -139,7 +138,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.times", &[])
.await?
.into_typed::<(time::Time,)>();
.into_typed::<(time::Time,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!("Parsed a time into time::Time: {:?}", read_time);
}
Expand All @@ -154,7 +153,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.times", &[])
.await?
.into_typed::<(CqlTime,)>();
.into_typed::<(CqlTime,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!("Read a time as raw nanos: {:?}", read_time);
}
Expand Down Expand Up @@ -185,7 +184,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.timestamps", &[])
.await?
.into_typed::<(DateTime<Utc>,)>();
.into_typed::<(DateTime<Utc>,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!(
"Parsed a timestamp into chrono::DateTime<chrono::Utc>: {:?}",
Expand All @@ -206,7 +205,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.timestamps", &[])
.await?
.into_typed::<(time::OffsetDateTime,)>();
.into_typed::<(time::OffsetDateTime,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!(
"Parsed a timestamp into time::OffsetDateTime: {:?}",
Expand All @@ -227,7 +226,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT d from examples_ks.timestamps", &[])
.await?
.into_typed::<(CqlTimestamp,)>();
.into_typed::<(CqlTimestamp,)>()?;
while let Some((read_time,)) = iter.try_next().await? {
println!("Read a timestamp as raw millis: {:?}", read_time);
}
Expand Down
43 changes: 25 additions & 18 deletions examples/cqlsh-rs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ use rustyline::completion::{Completer, Pair};
use rustyline::error::ReadlineError;
use rustyline::{CompletionType, Config, Context, Editor};
use rustyline_derive::{Helper, Highlighter, Hinter, Validator};
use scylla::transport::session::Session;
use scylla::transport::Compression;
use scylla::{QueryResult, Session, SessionBuilder};
use scylla::QueryRowsResult;
use scylla::SessionBuilder;
use scylla_cql::frame::response::result::Row;
use std::env;

#[derive(Helper, Highlighter, Validator, Hinter)]
Expand Down Expand Up @@ -173,23 +176,24 @@ impl Completer for CqlHelper {
}
}

fn print_result(result: &QueryResult) {
if result.rows.is_none() {
println!("OK");
return;
}
for row in result.rows.as_ref().unwrap() {
for column in &row.columns {
print!("|");
print!(
" {:16}",
match column {
None => "null".to_owned(),
Some(value) => format!("{:?}", value),
}
);
fn print_result(result: Option<&QueryRowsResult>) {
if let Some(rows_result) = result {
for row in rows_result.rows::<Row>().unwrap() {
let row = row.unwrap();
for column in &row.columns {
print!("|");
print!(
" {:16}",
match column {
None => "null".to_owned(),
Some(value) => format!("{:?}", value),
}
);
}
println!("|")
}
println!("|")
} else {
println!("OK");
}
}

Expand Down Expand Up @@ -222,7 +226,10 @@ async fn main() -> Result<()> {
let maybe_res = session.query_unpaged(line, &[]).await;
match maybe_res {
Err(err) => println!("Error: {}", err),
Ok(res) => print_result(&res),
Ok(res) => {
let rows_res = res.into_rows_result()?;
print_result(rows_res.as_ref())
}
}
}
Err(ReadlineError::Interrupted) => continue,
Expand Down
5 changes: 4 additions & 1 deletion examples/custom_deserialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use anyhow::Result;
use scylla::cql_to_rust::{FromCqlVal, FromCqlValError};
use scylla::frame::response::result::CqlValue;
use scylla::macros::impl_from_cql_value_from_method;
use scylla::{Session, SessionBuilder};
use scylla::transport::session::Session;
use scylla::SessionBuilder;
use std::env;

#[tokio::main]
Expand Down Expand Up @@ -46,6 +47,7 @@ async fn main() -> Result<()> {
(),
)
.await?
.into_legacy_result()?
.single_row_typed::<(MyType,)>()?;
assert_eq!(v, MyType("asdf".to_owned()));

Expand Down Expand Up @@ -73,6 +75,7 @@ async fn main() -> Result<()> {
(),
)
.await?
.into_legacy_result()?
.single_row_typed::<(MyOtherType,)>()?;
assert_eq!(v, MyOtherType("asdf".to_owned()));

Expand Down
25 changes: 17 additions & 8 deletions examples/get_by_name.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::{anyhow, Result};
use anyhow::{anyhow, Context as _, Result};
use scylla::transport::session::Session;
use scylla::SessionBuilder;
use scylla_cql::frame::response::result::Row;
use std::env;

#[tokio::main]
Expand Down Expand Up @@ -35,18 +36,26 @@ async fn main() -> Result<()> {
)
.await?;

let query_result = session
let rows_result = session
.query_unpaged("SELECT pk, ck, value FROM examples_ks.get_by_name", &[])
.await?;
let (ck_idx, _) = query_result
.get_column_spec("ck")
.await?
.into_rows_result()?
.context("Response is not of Rows type")?;
let col_specs = rows_result.column_specs();
let (ck_idx, _) = col_specs
.get_by_name("ck")
.ok_or_else(|| anyhow!("No ck column found"))?;
let (value_idx, _) = query_result
.get_column_spec("value")
let (value_idx, _) = col_specs
.get_by_name("value")
.ok_or_else(|| anyhow!("No value column found"))?;
let rows = rows_result
.rows::<Row>()
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
println!("ck | value");
println!("---------------------");
for row in query_result.rows.ok_or_else(|| anyhow!("no rows found"))? {
for row in rows {
println!("{:?} | {:?}", row.columns[ck_idx], row.columns[value_idx]);
}

Expand Down
3 changes: 1 addition & 2 deletions examples/logging_log.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use anyhow::Result;
use scylla::transport::session::Session;
use scylla::SessionBuilder;
use scylla::{Session, SessionBuilder};
use std::env;
use tracing::info;

Expand Down
1 change: 0 additions & 1 deletion examples/query_history.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! This example shows how to collect history of query execution.

use anyhow::Result;
use futures::StreamExt;
use scylla::history::{HistoryCollector, StructuredHistory};
use scylla::query::Query;
use scylla::transport::session::Session;
Expand Down
3 changes: 1 addition & 2 deletions examples/schema_agreement.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use anyhow::{bail, Result};
use futures::TryStreamExt;
use scylla::transport::errors::QueryError;
use scylla::transport::session::Session;
use scylla::SessionBuilder;
Expand Down Expand Up @@ -70,7 +69,7 @@ async fn main() -> Result<()> {
let mut iter = session
.query_iter("SELECT a, b, c FROM examples_ks.schema_agreement", &[])
.await?
.into_typed::<(i32, i32, String)>();
.into_typed::<(i32, i32, String)>()?;
while let Some((a, b, c)) = iter.try_next().await? {
println!("a, b, c: {}, {}, {}", a, b, c);
}
Expand Down
15 changes: 11 additions & 4 deletions examples/select-paging.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use anyhow::Result;
use futures::stream::StreamExt;
use scylla::statement::PagingState;
use scylla::{query::Query, Session, SessionBuilder};
use std::env;
Expand Down Expand Up @@ -35,7 +34,7 @@ async fn main() -> Result<()> {
let mut rows_stream = session
.query_iter("SELECT a, b, c FROM examples_ks.select_paging", &[])
.await?
.into_typed::<(i32, i32, String)>();
.into_typed::<(i32, i32, String)>()?;

while let Some(next_row_res) = rows_stream.next().await {
let (a, b, c) = next_row_res?;
Expand All @@ -51,10 +50,14 @@ async fn main() -> Result<()> {
.query_single_page(paged_query.clone(), &[], paging_state)
.await?;

let res = res
.into_rows_result()?
.expect("Got result different than Rows");

println!(
"Paging state: {:#?} ({} rows)",
paging_state_response,
res.rows_num()?,
res.rows_num(),
);

match paging_state_response.into_paging_control_flow() {
Expand All @@ -81,10 +84,14 @@ async fn main() -> Result<()> {
.execute_single_page(&paged_prepared, &[], paging_state)
.await?;

let res = res
.into_rows_result()?
.expect("Got result different than Rows");

println!(
"Paging state from the prepared statement execution: {:#?} ({} rows)",
paging_state_response,
res.rows_num()?,
res.rows_num(),
);

match paging_state_response.into_paging_control_flow() {
Expand Down
Loading
Loading