Skip to content

Commit

Permalink
Merge pull request #1135 from wprzytula/fix-batches-to-different-tables
Browse files Browse the repository at this point in the history
frame/result: allow differing TableSpecs in PreparedMetadata
  • Loading branch information
wprzytula authored Dec 5, 2024
2 parents 4b6ad84 + 693c9db commit 92fdd71
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 73 deletions.
1 change: 1 addition & 0 deletions scylla-cql/src/frame/frame_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ pub struct ColumnSpecParseError {
pub enum ColumnSpecParseErrorKind {
#[error("Invalid table spec: {0}")]
TableSpecParseError(#[from] TableSpecParseError),
// TODO: remove this variant before the next major release.
#[error("Table spec differs across columns - got specs: {0:?} and {1:?}")]
TableSpecDiffersAcrossColumns(TableSpec<'static>, TableSpec<'static>),
#[error("Malformed column name: {0}")]
Expand Down
79 changes: 7 additions & 72 deletions scylla-cql/src/frame/response/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -977,79 +977,22 @@ fn mk_col_spec_parse_error(
}
}

/// Deserializes table spec of a column spec in the borrowed form.
///
/// Checks for equality of table specs across columns, because the protocol
/// does not guarantee that and we want to be sure that the assumption
/// of them being all the same is correct.
/// To this end, the first column's table spec is written to `known_table_spec`
/// and compared with remaining columns' table spec.
///
/// To avoid needless allocations, it is advised to pass `known_table_spec`
/// in the borrowed form, so that cloning it is cheap.
fn deser_table_spec_for_col_spec<'frame>(
buf: &'_ mut &'frame [u8],
global_table_spec_provided: bool,
known_table_spec: &'_ mut Option<TableSpec<'frame>>,
col_idx: usize,
) -> StdResult<TableSpec<'frame>, ColumnSpecParseError> {
let table_spec = match known_table_spec {
// If global table spec was provided, we simply clone it to each column spec.
Some(ref known_spec) if global_table_spec_provided => known_spec.clone(),

// Else, we deserialize the table spec for a column and, if we already know some
// previous spec (i.e. that of the first column), we perform equality check
// against it.
Some(_) | None => {
let table_spec =
deser_table_spec(buf).map_err(|err| mk_col_spec_parse_error(col_idx, err))?;

if let Some(ref known_spec) = known_table_spec {
// We assume that for each column, table spec is the same.
// As this is not guaranteed by the CQL protocol specification but only by how
// Cassandra and ScyllaDB work (no support for joins), we perform a sanity check here.
if known_spec.table_name != table_spec.table_name
|| known_spec.ks_name != table_spec.ks_name
{
return Err(mk_col_spec_parse_error(
col_idx,
ColumnSpecParseErrorKind::TableSpecDiffersAcrossColumns(
known_spec.clone().into_owned(),
table_spec.into_owned(),
),
));
}
} else {
// Once we have read the first column spec, we save its table spec
// in order to verify its equality with other columns'.
*known_table_spec = Some(table_spec.clone());
}

table_spec
}
};

Ok(table_spec)
}

fn deser_col_specs_generic<'frame, 'result>(
buf: &mut &'frame [u8],
global_table_spec: Option<TableSpec<'frame>>,
col_count: usize,
make_col_spec: fn(&'frame str, ColumnType<'result>, TableSpec<'frame>) -> ColumnSpec<'result>,
deser_type: fn(&mut &'frame [u8]) -> StdResult<ColumnType<'result>, CqlTypeParseError>,
) -> StdResult<Vec<ColumnSpec<'result>>, ColumnSpecParseError> {
let global_table_spec_provided = global_table_spec.is_some();
let mut known_table_spec = global_table_spec;

let mut col_specs = Vec::with_capacity(col_count);
for col_idx in 0..col_count {
let table_spec = deser_table_spec_for_col_spec(
buf,
global_table_spec_provided,
&mut known_table_spec,
col_idx,
)?;
let table_spec = match global_table_spec {
// If global table spec was provided, we simply clone it to each column spec.
Some(ref known_spec) => known_spec.clone(),

// Else, we deserialize the table spec for a column.
None => deser_table_spec(buf).map_err(|err| mk_col_spec_parse_error(col_idx, err))?,
};

let name = types::read_string(buf).map_err(|err| mk_col_spec_parse_error(col_idx, err))?;
let typ = deser_type(buf).map_err(|err| mk_col_spec_parse_error(col_idx, err))?;
Expand All @@ -1062,10 +1005,6 @@ fn deser_col_specs_generic<'frame, 'result>(
/// Deserializes col specs (part of ResultMetadata or PreparedMetadata)
/// in the borrowed form.
///
/// Checks for equality of table specs across columns, because the protocol
/// does not guarantee that and we want to be sure that the assumption
/// of them being all the same is correct.
///
/// To avoid needless allocations, it is advised to pass `global_table_spec`
/// in the borrowed form, so that cloning it is cheap.
fn deser_col_specs_borrowed<'frame>(
Expand All @@ -1085,10 +1024,6 @@ fn deser_col_specs_borrowed<'frame>(
/// Deserializes col specs (part of ResultMetadata or PreparedMetadata)
/// in the owned form.
///
/// Checks for equality of table specs across columns, because the protocol
/// does not guarantee that and we want to be sure that the assumption
/// of them being all the same is correct.
///
/// To avoid needless allocations, it is advised to pass `global_table_spec`
/// in the borrowed form, so that cloning it is cheap.
fn deser_col_specs_owned<'frame>(
Expand Down
38 changes: 37 additions & 1 deletion scylla/src/transport/session_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ async fn test_batch() {
.await
.unwrap();

// TODO: Add API, that supports binding values to statements in batch creation process,
// TODO: Add API that supports binding values to statements in batch creation process,
// to avoid problem of statements/values count mismatch
use crate::batch::Batch;
let mut batch: Batch = Default::default();
Expand Down Expand Up @@ -537,6 +537,42 @@ async fn test_batch() {
assert_eq!(results, vec![(4, 20, String::from("foobar"))]);
}

// This is a regression test for #1134.
#[tokio::test]
async fn test_batch_to_multiple_tables() {
setup_tracing();
let session = create_new_session_builder().build().await.unwrap();
let ks = unique_keyspace_name();

session.ddl(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks)).await.unwrap();
session.use_keyspace(&ks, true).await.unwrap();
session
.ddl("CREATE TABLE IF NOT EXISTS t_batch1 (a int, b int, c text, primary key (a, b))")
.await
.unwrap();
session
.ddl("CREATE TABLE IF NOT EXISTS t_batch2 (a int, b int, c text, primary key (a, b))")
.await
.unwrap();

let prepared_statement = session
.prepare(
"
BEGIN BATCH
INSERT INTO t_batch1 (a, b, c) VALUES (?, ?, ?);
INSERT INTO t_batch2 (a, b, c) VALUES (?, ?, ?);
APPLY BATCH;
",
)
.await
.unwrap();

session
.execute_unpaged(&prepared_statement, (1, 2, "ala", 4, 5, "ma"))
.await
.unwrap();
}

#[tokio::test]
async fn test_token_calculation() {
setup_tracing();
Expand Down

0 comments on commit 92fdd71

Please sign in to comment.