Skip to content

Commit

Permalink
Faulty integration test for tablet shard awareness
Browse files Browse the repository at this point in the history
  • Loading branch information
Lorak-mmk committed Mar 9, 2024
1 parent f6490a5 commit 466af8e
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 0 deletions.
2 changes: 2 additions & 0 deletions scylla/tests/integration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ mod retries;
mod shards;
mod silent_prepare_query;
mod skip_metadata_optimization;
#[cfg(feature = "unstable-tablets")]
mod tablets;
pub(crate) mod utils;
223 changes: 223 additions & 0 deletions scylla/tests/integration/tablets.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
use std::sync::Arc;

use crate::utils::test_with_3_node_cluster;

use scylla::test_utils::unique_keyspace_name;
use scylla::transport::Node;
use scylla::{IntoTypedRows, Session};

use scylla_proxy::{
Condition, ProxyError, Reaction, ResponseFrame, ResponseOpcode, ResponseReaction, ResponseRule,
ShardAwareness, TargetShard, WorkerError,
};

use tokio::sync::mpsc;
use uuid::Uuid;

#[derive(scylla::FromRow)]
struct SelectedTablet {
last_token: i64,
replicas: Vec<(Uuid, i32)>,
}

struct Tablet {
first_token: i64,
last_token: i64,
replicas: Vec<(Arc<Node>, i32)>,
}

async fn get_tablets(session: &Session, ks: String, table: String) -> Vec<Tablet> {
let cluster_data = session.get_cluster_data();
let endpoints = cluster_data.get_nodes_info();
for endpoint in endpoints.iter() {
println!(
"id: {}, address: {}",
endpoint.host_id,
endpoint.address.ip()
);
}

let selected_tablets_rows = session.query(
"select last_token, replicas from system.tablets WHERE keyspace_name = ? and table_name = ? ALLOW FILTERING",
&(ks.as_str(), table.as_str())).await.unwrap().rows.unwrap();

let mut selected_tablets = selected_tablets_rows
.into_typed::<SelectedTablet>()
.map(|x| x.unwrap())
.collect::<Vec<_>>();
selected_tablets.sort_unstable_by(|a, b| a.last_token.cmp(&b.last_token));

let mut tablets = Vec::new();
let mut first_token = i64::MIN;
for tablet in selected_tablets {
let replicas = tablet
.replicas
.iter()
.map(|(uuid, shard)| {
(
Arc::clone(
endpoints
.get(endpoints.iter().position(|e| e.host_id == *uuid).unwrap())
.unwrap(),
),
*shard,
)
})
.collect();
let raw_tablet = Tablet {
first_token,
last_token: tablet.last_token,
replicas,
};
first_token = tablet.last_token.wrapping_add(1);
tablets.push(raw_tablet);
}

tablets
}

#[tokio::test]
#[ntest::timeout(30000)]
#[cfg(not(scylla_cloud_tests))]
async fn test_tablet_shard_awareness() {
const TABLET_COUNT: usize = 16;

let res = test_with_3_node_cluster(
ShardAwareness::QueryNode,
|proxy_uris, translation_map, mut running_proxy| async move {
let session = scylla::SessionBuilder::new()
.known_node(proxy_uris[0].as_str())
.address_translator(Arc::new(translation_map))
.build()
.await
.unwrap();
let ks = unique_keyspace_name();

/* Prepare schema */
session
.query(
format!(
"CREATE KEYSPACE IF NOT EXISTS {}
WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}}
AND tablets = {{ 'initial': {} }}",
ks,
TABLET_COUNT
),
&[],
)
.await
.unwrap();
session
.query(
format!(
"CREATE TABLE IF NOT EXISTS {}.t (a int, b int, c text, primary key (a, b))",
ks
),
&[],
)
.await
.unwrap();

let tablets = get_tablets(&session, ks.clone(), "t".to_string()).await;

// Print information about tablets, useful for debugging
for tablet in tablets.iter() {
println!("[{}, {}]: {:?}",
tablet.first_token,
tablet.last_token,
tablet.replicas.iter().map(|(replica, shard)| {
(replica.address.ip(), shard)
}).collect::<Vec<_>>());
}

let prepared = session
.prepare(format!(
"INSERT INTO {}.t (a, b, c) VALUES (?, ?, 'abc')",
ks
))
.await
.unwrap();

// Here we calculate a PK per token
let mut present_tablets = [false; TABLET_COUNT];
let mut value_lists = vec![];
for i in 0..1000 {
let token_value = prepared.calculate_token(&(i, 1)).unwrap().unwrap().value;
let tablet_idx = tablets.iter().position(|tablet| tablet.first_token <= token_value && token_value <= tablet.last_token).unwrap();
if !present_tablets[tablet_idx] {
let values = (i, 1);
let tablet = &tablets[tablet_idx];
println!("Values: {:?}, token: {}, tablet index: {}, tablet: [{}, {}], replicas: {:?}",
values,
token_value,
tablet_idx,
tablet.first_token,
tablet.last_token,
tablet.replicas.iter().map(|(replica, shard)| {
(replica.address.ip(), shard)
}).collect::<Vec<_>>()
);
value_lists.push(values);
present_tablets[tablet_idx] = true;
}
}

assert!(present_tablets.iter().all(|x| *x));


fn count_tablet_feedbacks (
rx: &mut mpsc::UnboundedReceiver<(ResponseFrame, Option<TargetShard>)>,
) -> usize {
std::iter::from_fn(|| rx.try_recv().ok()).map(|(frame, _shard)| {
let response = scylla_cql::frame::parse_response_body_extensions(frame.params.flags, None, frame.body).unwrap();
match response.custom_payload {
Some(map) => map.contains_key("tablets-routing-v1"),
None => false
}
}).filter(|b| *b).count()
}

let (feedback_txs, mut feedback_rxs): (Vec<_>, Vec<_>) = (0..3)
.map(|_| mpsc::unbounded_channel::<(ResponseFrame, Option<TargetShard>)>())
.unzip();
for (i, tx) in feedback_txs.iter().cloned().enumerate() {
running_proxy.running_nodes[i].change_response_rules(Some(vec![ResponseRule(
Condition::ResponseOpcode(ResponseOpcode::Result)
.and(Condition::not(Condition::ConnectionRegisteredAnyEvent)),
ResponseReaction::noop().with_feedback_when_performed(tx),
)]));
}

// At first, driver has no info about tablets, so for each key we expect
// to send it to wrong replica at least once.
for values in value_lists.iter() {
println!("{:?}, token: {}", values, prepared.calculate_token(&values).unwrap().unwrap().value);
for _ in 0..10 {
session.execute(&prepared, values).await.unwrap();
}

let feedbacks: usize = feedback_rxs.iter_mut().map(count_tablet_feedbacks).sum();
assert!(feedbacks > 0);
}

// Here we should have complete info about all the tablets, so there should
// be no occurence of sending to wrong replica.
for values in value_lists.iter() {
println!("{:?}, token: {}", values, prepared.calculate_token(&values).unwrap().unwrap().value);
for _ in 0..10 {
session.execute(&prepared, values).await.unwrap();
}
let feedbacks: usize = feedback_rxs.iter_mut().map(count_tablet_feedbacks).sum();
assert_eq!(feedbacks, 0);
}

running_proxy
},
)
.await;
match res {
Ok(()) => (),
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (),
Err(err) => panic!("{}", err),
}
}

0 comments on commit 466af8e

Please sign in to comment.