diff --git a/scylla/tests/integration/main.rs b/scylla/tests/integration/main.rs index 7f09ae2c5a..fdaa8d9e4c 100644 --- a/scylla/tests/integration/main.rs +++ b/scylla/tests/integration/main.rs @@ -7,4 +7,6 @@ mod retries; mod shards; mod silent_prepare_query; mod skip_metadata_optimization; +#[cfg(feature = "unstable-tablets")] +mod tablets; pub(crate) mod utils; diff --git a/scylla/tests/integration/tablets.rs b/scylla/tests/integration/tablets.rs new file mode 100644 index 0000000000..a9cc9019a2 --- /dev/null +++ b/scylla/tests/integration/tablets.rs @@ -0,0 +1,223 @@ +use std::sync::Arc; + +use crate::utils::test_with_3_node_cluster; + +use scylla::test_utils::unique_keyspace_name; +use scylla::transport::Node; +use scylla::{IntoTypedRows, Session}; + +use scylla_proxy::{ + Condition, ProxyError, Reaction, ResponseFrame, ResponseOpcode, ResponseReaction, ResponseRule, + ShardAwareness, TargetShard, WorkerError, +}; + +use tokio::sync::mpsc; +use uuid::Uuid; + +#[derive(scylla::FromRow)] +struct SelectedTablet { + last_token: i64, + replicas: Vec<(Uuid, i32)>, +} + +struct Tablet { + first_token: i64, + last_token: i64, + replicas: Vec<(Arc, i32)>, +} + +async fn get_tablets(session: &Session, ks: String, table: String) -> Vec { + let cluster_data = session.get_cluster_data(); + let endpoints = cluster_data.get_nodes_info(); + for endpoint in endpoints.iter() { + println!( + "id: {}, address: {}", + endpoint.host_id, + endpoint.address.ip() + ); + } + + let selected_tablets_rows = session.query( + "select last_token, replicas from system.tablets WHERE keyspace_name = ? and table_name = ? ALLOW FILTERING", + &(ks.as_str(), table.as_str())).await.unwrap().rows.unwrap(); + + let mut selected_tablets = selected_tablets_rows + .into_typed::() + .map(|x| x.unwrap()) + .collect::>(); + selected_tablets.sort_unstable_by(|a, b| a.last_token.cmp(&b.last_token)); + + let mut tablets = Vec::new(); + let mut first_token = i64::MIN; + for tablet in selected_tablets { + let replicas = tablet + .replicas + .iter() + .map(|(uuid, shard)| { + ( + Arc::clone( + endpoints + .get(endpoints.iter().position(|e| e.host_id == *uuid).unwrap()) + .unwrap(), + ), + *shard, + ) + }) + .collect(); + let raw_tablet = Tablet { + first_token, + last_token: tablet.last_token, + replicas, + }; + first_token = tablet.last_token.wrapping_add(1); + tablets.push(raw_tablet); + } + + tablets +} + +#[tokio::test] +#[ntest::timeout(30000)] +#[cfg(not(scylla_cloud_tests))] +async fn test_tablet_shard_awareness() { + const TABLET_COUNT: usize = 16; + + let res = test_with_3_node_cluster( + ShardAwareness::QueryNode, + |proxy_uris, translation_map, mut running_proxy| async move { + let session = scylla::SessionBuilder::new() + .known_node(proxy_uris[0].as_str()) + .address_translator(Arc::new(translation_map)) + .build() + .await + .unwrap(); + let ks = unique_keyspace_name(); + + /* Prepare schema */ + session + .query( + format!( + "CREATE KEYSPACE IF NOT EXISTS {} + WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}} + AND tablets = {{ 'initial': {} }}", + ks, + TABLET_COUNT + ), + &[], + ) + .await + .unwrap(); + session + .query( + format!( + "CREATE TABLE IF NOT EXISTS {}.t (a int, b int, c text, primary key (a, b))", + ks + ), + &[], + ) + .await + .unwrap(); + + let tablets = get_tablets(&session, ks.clone(), "t".to_string()).await; + + // Print information about tablets, useful for debugging + for tablet in tablets.iter() { + println!("[{}, {}]: {:?}", + tablet.first_token, + tablet.last_token, + tablet.replicas.iter().map(|(replica, shard)| { + (replica.address.ip(), shard) + }).collect::>()); + } + + let prepared = session + .prepare(format!( + "INSERT INTO {}.t (a, b, c) VALUES (?, ?, 'abc')", + ks + )) + .await + .unwrap(); + + // Here we calculate a PK per token + let mut present_tablets = [false; TABLET_COUNT]; + let mut value_lists = vec![]; + for i in 0..1000 { + let token_value = prepared.calculate_token(&(i, 1)).unwrap().unwrap().value; + let tablet_idx = tablets.iter().position(|tablet| tablet.first_token <= token_value && token_value <= tablet.last_token).unwrap(); + if !present_tablets[tablet_idx] { + let values = (i, 1); + let tablet = &tablets[tablet_idx]; + println!("Values: {:?}, token: {}, tablet index: {}, tablet: [{}, {}], replicas: {:?}", + values, + token_value, + tablet_idx, + tablet.first_token, + tablet.last_token, + tablet.replicas.iter().map(|(replica, shard)| { + (replica.address.ip(), shard) + }).collect::>() + ); + value_lists.push(values); + present_tablets[tablet_idx] = true; + } + } + + assert!(present_tablets.iter().all(|x| *x)); + + + fn count_tablet_feedbacks ( + rx: &mut mpsc::UnboundedReceiver<(ResponseFrame, Option)>, + ) -> usize { + std::iter::from_fn(|| rx.try_recv().ok()).map(|(frame, _shard)| { + let response = scylla_cql::frame::parse_response_body_extensions(frame.params.flags, None, frame.body).unwrap(); + match response.custom_payload { + Some(map) => map.contains_key("tablets-routing-v1"), + None => false + } + }).filter(|b| *b).count() + } + + let (feedback_txs, mut feedback_rxs): (Vec<_>, Vec<_>) = (0..3) + .map(|_| mpsc::unbounded_channel::<(ResponseFrame, Option)>()) + .unzip(); + for (i, tx) in feedback_txs.iter().cloned().enumerate() { + running_proxy.running_nodes[i].change_response_rules(Some(vec![ResponseRule( + Condition::ResponseOpcode(ResponseOpcode::Result) + .and(Condition::not(Condition::ConnectionRegisteredAnyEvent)), + ResponseReaction::noop().with_feedback_when_performed(tx), + )])); + } + + // At first, driver has no info about tablets, so for each key we expect + // to send it to wrong replica at least once. + for values in value_lists.iter() { + println!("{:?}, token: {}", values, prepared.calculate_token(&values).unwrap().unwrap().value); + for _ in 0..10 { + session.execute(&prepared, values).await.unwrap(); + } + + let feedbacks: usize = feedback_rxs.iter_mut().map(count_tablet_feedbacks).sum(); + assert!(feedbacks > 0); + } + + // Here we should have complete info about all the tablets, so there should + // be no occurence of sending to wrong replica. + for values in value_lists.iter() { + println!("{:?}, token: {}", values, prepared.calculate_token(&values).unwrap().unwrap().value); + for _ in 0..10 { + session.execute(&prepared, values).await.unwrap(); + } + let feedbacks: usize = feedback_rxs.iter_mut().map(count_tablet_feedbacks).sum(); + assert_eq!(feedbacks, 0); + } + + running_proxy + }, + ) + .await; + match res { + Ok(()) => (), + Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (), + Err(err) => panic!("{}", err), + } +}