diff --git a/scylla/tests/integration/main.rs b/scylla/tests/integration/main.rs index 4b8920309a..7f09ae2c5a 100644 --- a/scylla/tests/integration/main.rs +++ b/scylla/tests/integration/main.rs @@ -6,4 +6,5 @@ mod new_session; mod retries; mod shards; mod silent_prepare_query; +mod skip_metadata_optimization; pub(crate) mod utils; diff --git a/scylla/tests/integration/skip_metadata_optimization.rs b/scylla/tests/integration/skip_metadata_optimization.rs new file mode 100644 index 0000000000..898c9e2897 --- /dev/null +++ b/scylla/tests/integration/skip_metadata_optimization.rs @@ -0,0 +1,80 @@ +use crate::utils::test_with_3_node_cluster; +use scylla::transport::session::Session; +use scylla::SessionBuilder; +use scylla::{prepared_statement::PreparedStatement, test_utils::unique_keyspace_name}; +use scylla_cql::frame::types; +use scylla_proxy::{ + Condition, ProxyError, Reaction, ResponseFrame, ResponseReaction, ShardAwareness, TargetShard, + WorkerError, +}; +use std::sync::Arc; + +#[tokio::test] +#[ntest::timeout(30000)] +#[cfg(not(scylla_cloud_tests))] +async fn test_skip_result_metadata() { + use scylla_proxy::{ResponseOpcode, ResponseRule}; + + const NO_METADATA_FLAG: i32 = 0x0004; + + let res = test_with_3_node_cluster(ShardAwareness::QueryNode, |proxy_uris, translation_map, mut running_proxy| async move { + // DB preparation phase + let session: Session = SessionBuilder::new() + .known_node(proxy_uris[0].as_str()) + .address_translator(Arc::new(translation_map)) + .build() + .await + .unwrap(); + + let ks = unique_keyspace_name(); + session.query(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}", ks), &[]).await.unwrap(); + session.use_keyspace(ks, false).await.unwrap(); + session + .query("CREATE TABLE t (a int primary key, b int, c int)", &[]) + .await + .unwrap(); + session.query("INSERT INTO t (a, b, c) VALUES (1, 2, 3)", &[]).await.unwrap(); + + let mut prepared = session.prepare("SELECT a, b, c FROM t").await.unwrap(); + + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + + let x = ResponseRule(Condition::ResponseOpcode(ResponseOpcode::Result), ResponseReaction::noop().with_feedback_when_performed(tx)); + running_proxy.running_nodes[0].change_response_rules(Some(vec![x])); + + async fn test_with_flags_predicate( + session: &Session, + prepared: &PreparedStatement, + rx: &mut tokio::sync::mpsc::UnboundedReceiver<(ResponseFrame, Option)>, + predicate: impl FnOnce(i32) -> bool + ) { + session.execute(prepared, &[]).await.unwrap(); + + let (frame, _shard) = rx.recv().await.unwrap(); + let mut buf = &*frame.body; + + match types::read_int(&mut buf).unwrap() { + 0x0002 => (), + _ => panic!("Invalid result type"), + } + let result_metadata_flags = types::read_int(&mut buf).unwrap(); + assert!(predicate(result_metadata_flags)); + } + + // Verify that server sends metadata when driver doesn't send SKIP_METADATA flag. + prepared.set_skip_result_metadata(false); + test_with_flags_predicate(&session, &prepared, &mut rx, |flags| flags & NO_METADATA_FLAG == 0).await; + + // Verify that server doesn't send metadata when driver sends SKIP_METADATA flag. + prepared.set_skip_result_metadata(true); + test_with_flags_predicate(&session, &prepared, &mut rx, |flags| flags & NO_METADATA_FLAG != 0).await; + + running_proxy + }).await; + + match res { + Ok(()) => (), + Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (), + Err(err) => panic!("{}", err), + } +}