-
Notifications
You must be signed in to change notification settings - Fork 112
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Faulty integration test for tablet shard awareness
- Loading branch information
Showing
2 changed files
with
225 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,223 @@ | ||
use std::sync::Arc; | ||
|
||
use crate::utils::test_with_3_node_cluster; | ||
|
||
use scylla::test_utils::unique_keyspace_name; | ||
use scylla::transport::Node; | ||
use scylla::{IntoTypedRows, Session}; | ||
|
||
use scylla_proxy::{ | ||
Condition, ProxyError, Reaction, ResponseFrame, ResponseOpcode, ResponseReaction, ResponseRule, | ||
ShardAwareness, TargetShard, WorkerError, | ||
}; | ||
|
||
use tokio::sync::mpsc; | ||
use uuid::Uuid; | ||
|
||
#[derive(scylla::FromRow)] | ||
struct SelectedTablet { | ||
last_token: i64, | ||
replicas: Vec<(Uuid, i32)>, | ||
} | ||
|
||
struct Tablet { | ||
first_token: i64, | ||
last_token: i64, | ||
replicas: Vec<(Arc<Node>, i32)>, | ||
} | ||
|
||
async fn get_tablets(session: &Session, ks: String, table: String) -> Vec<Tablet> { | ||
let cluster_data = session.get_cluster_data(); | ||
let endpoints = cluster_data.get_nodes_info(); | ||
for endpoint in endpoints.iter() { | ||
println!( | ||
"id: {}, address: {}", | ||
endpoint.host_id, | ||
endpoint.address.ip() | ||
); | ||
} | ||
|
||
let selected_tablets_rows = session.query( | ||
"select last_token, replicas from system.tablets WHERE keyspace_name = ? and table_name = ? ALLOW FILTERING", | ||
&(ks.as_str(), table.as_str())).await.unwrap().rows.unwrap(); | ||
|
||
let mut selected_tablets = selected_tablets_rows | ||
.into_typed::<SelectedTablet>() | ||
.map(|x| x.unwrap()) | ||
.collect::<Vec<_>>(); | ||
selected_tablets.sort_unstable_by(|a, b| a.last_token.cmp(&b.last_token)); | ||
|
||
let mut tablets = Vec::new(); | ||
let mut first_token = i64::MIN; | ||
for tablet in selected_tablets { | ||
let replicas = tablet | ||
.replicas | ||
.iter() | ||
.map(|(uuid, shard)| { | ||
( | ||
Arc::clone( | ||
endpoints | ||
.get(endpoints.iter().position(|e| e.host_id == *uuid).unwrap()) | ||
.unwrap(), | ||
), | ||
*shard, | ||
) | ||
}) | ||
.collect(); | ||
let raw_tablet = Tablet { | ||
first_token, | ||
last_token: tablet.last_token, | ||
replicas, | ||
}; | ||
first_token = tablet.last_token.wrapping_add(1); | ||
tablets.push(raw_tablet); | ||
} | ||
|
||
tablets | ||
} | ||
|
||
#[tokio::test] | ||
#[ntest::timeout(30000)] | ||
#[cfg(not(scylla_cloud_tests))] | ||
async fn test_tablet_shard_awareness() { | ||
const TABLET_COUNT: usize = 16; | ||
|
||
let res = test_with_3_node_cluster( | ||
ShardAwareness::QueryNode, | ||
|proxy_uris, translation_map, mut running_proxy| async move { | ||
let session = scylla::SessionBuilder::new() | ||
.known_node(proxy_uris[0].as_str()) | ||
.address_translator(Arc::new(translation_map)) | ||
.build() | ||
.await | ||
.unwrap(); | ||
let ks = unique_keyspace_name(); | ||
|
||
/* Prepare schema */ | ||
session | ||
.query( | ||
format!( | ||
"CREATE KEYSPACE IF NOT EXISTS {} | ||
WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 3}} | ||
AND tablets = {{ 'initial': {} }}", | ||
ks, | ||
TABLET_COUNT | ||
), | ||
&[], | ||
) | ||
.await | ||
.unwrap(); | ||
session | ||
.query( | ||
format!( | ||
"CREATE TABLE IF NOT EXISTS {}.t (a int, b int, c text, primary key (a, b))", | ||
ks | ||
), | ||
&[], | ||
) | ||
.await | ||
.unwrap(); | ||
|
||
let tablets = get_tablets(&session, ks.clone(), "t".to_string()).await; | ||
|
||
// Print information about tablets, useful for debugging | ||
for tablet in tablets.iter() { | ||
println!("[{}, {}]: {:?}", | ||
tablet.first_token, | ||
tablet.last_token, | ||
tablet.replicas.iter().map(|(replica, shard)| { | ||
(replica.address.ip(), shard) | ||
}).collect::<Vec<_>>()); | ||
} | ||
|
||
let prepared = session | ||
.prepare(format!( | ||
"INSERT INTO {}.t (a, b, c) VALUES (?, ?, 'abc')", | ||
ks | ||
)) | ||
.await | ||
.unwrap(); | ||
|
||
// Here we calculate a PK per token | ||
let mut present_tablets = [false; TABLET_COUNT]; | ||
let mut value_lists = vec![]; | ||
for i in 0..1000 { | ||
let token_value = prepared.calculate_token(&(i, 1)).unwrap().unwrap().value; | ||
let tablet_idx = tablets.iter().position(|tablet| tablet.first_token <= token_value && token_value <= tablet.last_token).unwrap(); | ||
if !present_tablets[tablet_idx] { | ||
let values = (i, 1); | ||
let tablet = &tablets[tablet_idx]; | ||
println!("Values: {:?}, token: {}, tablet index: {}, tablet: [{}, {}], replicas: {:?}", | ||
values, | ||
token_value, | ||
tablet_idx, | ||
tablet.first_token, | ||
tablet.last_token, | ||
tablet.replicas.iter().map(|(replica, shard)| { | ||
(replica.address.ip(), shard) | ||
}).collect::<Vec<_>>() | ||
); | ||
value_lists.push(values); | ||
present_tablets[tablet_idx] = true; | ||
} | ||
} | ||
|
||
assert!(present_tablets.iter().all(|x| *x)); | ||
|
||
|
||
fn count_tablet_feedbacks ( | ||
rx: &mut mpsc::UnboundedReceiver<(ResponseFrame, Option<TargetShard>)>, | ||
) -> usize { | ||
std::iter::from_fn(|| rx.try_recv().ok()).map(|(frame, _shard)| { | ||
let response = scylla_cql::frame::parse_response_body_extensions(frame.params.flags, None, frame.body).unwrap(); | ||
match response.custom_payload { | ||
Some(map) => map.contains_key("tablets-routing-v1"), | ||
None => false | ||
} | ||
}).filter(|b| *b).count() | ||
} | ||
|
||
let (feedback_txs, mut feedback_rxs): (Vec<_>, Vec<_>) = (0..3) | ||
.map(|_| mpsc::unbounded_channel::<(ResponseFrame, Option<TargetShard>)>()) | ||
.unzip(); | ||
for (i, tx) in feedback_txs.iter().cloned().enumerate() { | ||
running_proxy.running_nodes[i].change_response_rules(Some(vec![ResponseRule( | ||
Condition::ResponseOpcode(ResponseOpcode::Result) | ||
.and(Condition::not(Condition::ConnectionRegisteredAnyEvent)), | ||
ResponseReaction::noop().with_feedback_when_performed(tx), | ||
)])); | ||
} | ||
|
||
// At first, driver has no info about tablets, so for each key we expect | ||
// to send it to wrong replica at least once. | ||
for values in value_lists.iter() { | ||
println!("{:?}, token: {}", values, prepared.calculate_token(&values).unwrap().unwrap().value); | ||
for _ in 0..10 { | ||
session.execute(&prepared, values).await.unwrap(); | ||
} | ||
|
||
let feedbacks: usize = feedback_rxs.iter_mut().map(count_tablet_feedbacks).sum(); | ||
assert!(feedbacks > 0); | ||
} | ||
|
||
// Here we should have complete info about all the tablets, so there should | ||
// be no occurence of sending to wrong replica. | ||
for values in value_lists.iter() { | ||
println!("{:?}, token: {}", values, prepared.calculate_token(&values).unwrap().unwrap().value); | ||
for _ in 0..10 { | ||
session.execute(&prepared, values).await.unwrap(); | ||
} | ||
let feedbacks: usize = feedback_rxs.iter_mut().map(count_tablet_feedbacks).sum(); | ||
assert_eq!(feedbacks, 0); | ||
} | ||
|
||
running_proxy | ||
}, | ||
) | ||
.await; | ||
match res { | ||
Ok(()) => (), | ||
Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (), | ||
Err(err) => panic!("{}", err), | ||
} | ||
} |