Skip to content

Commit

Permalink
Merge pull request scylladb#1102 from Lorak-mmk/overhaul-integration-…
Browse files Browse the repository at this point in the history
…tests

Move some tests to integration

(cherry picked from commit be14812)
  • Loading branch information
Lorak-mmk authored and wprzytula committed Dec 11, 2024
1 parent 02ef01c commit e174246
Show file tree
Hide file tree
Showing 24 changed files with 500 additions and 383 deletions.
249 changes: 2 additions & 247 deletions scylla/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,28 +449,21 @@ fn write_fiber_attempts(fiber: &FiberHistory, f: &mut std::fmt::Formatter<'_>) -

#[cfg(test)]
mod tests {
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Arc,
};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

use crate::{
query::Query,
retry_policy::RetryDecision,
test_utils::setup_tracing,
transport::errors::{DbError, QueryError},
utils::test_utils::unique_keyspace_name,
};

use super::{
AttemptId, AttemptResult, HistoryCollector, HistoryListener, QueryHistoryResult, QueryId,
SpeculativeId, StructuredHistory, TimePoint,
};
use crate::test_utils::create_new_session_builder;
use assert_matches::assert_matches;
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
use futures::StreamExt as _;
use scylla_cql::{frame::response::result::Row, Consistency};
use scylla_cql::Consistency;

// Set a single time for all timestamps within StructuredHistory.
// HistoryCollector sets the timestamp to current time which changes with each test.
Expand Down Expand Up @@ -510,53 +503,6 @@ mod tests {
history
}

// Set a single node for all attempts within StructuredHistory.
// When running against real life nodes this address may change,
// setting it to one value makes it possible to run tests consistently.
fn set_one_node(mut history: StructuredHistory) -> StructuredHistory {
let the_node: SocketAddr = node1_addr();

for query in &mut history.queries {
for fiber in std::iter::once(&mut query.non_speculative_fiber)
.chain(query.speculative_fibers.iter_mut())
{
for attempt in &mut fiber.attempts {
attempt.node_addr = the_node;
}
}
}

history
}

// Set a single error message for all DbErrors within StructuredHistory.
// The error message changes between Scylla/Cassandra/their versions.
// Setting it to one value makes it possible to run tests consistently.
fn set_one_db_error_message(mut history: StructuredHistory) -> StructuredHistory {
let set_msg = |err: &mut QueryError| {
if let QueryError::DbError(_, msg) = err {
*msg = "Error message from database".to_string();
}
};

for query in &mut history.queries {
if let Some(QueryHistoryResult::Error(_, err)) = &mut query.result {
set_msg(err);
}
for fiber in std::iter::once(&mut query.non_speculative_fiber)
.chain(query.speculative_fibers.iter_mut())
{
for attempt in &mut fiber.attempts {
if let Some(AttemptResult::Error(_, err, _)) = &mut attempt.result {
set_msg(err);
}
}
}
}

history
}

fn node1_addr() -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 19042)
}
Expand Down Expand Up @@ -913,195 +859,4 @@ mod tests {
";
assert_eq!(displayed, format!("{}", set_one_time(history)));
}

#[tokio::test]
async fn successful_query_history() {
setup_tracing();
let session = create_new_session_builder().build().await.unwrap();

let mut query = Query::new("SELECT * FROM system.local");
let history_collector = Arc::new(HistoryCollector::new());
query.set_history_listener(history_collector.clone());

session.query_unpaged(query.clone(), ()).await.unwrap();

let history: StructuredHistory = history_collector.clone_structured_history();

let displayed = "Queries History:
=== Query #0 ===
| start_time: 2022-02-22 20:22:22 UTC
| Non-speculative attempts:
| - Attempt #0 sent to 127.0.0.1:19042
| request send time: 2022-02-22 20:22:22 UTC
| Success at 2022-02-22 20:22:22 UTC
|
| Query successful at 2022-02-22 20:22:22 UTC
=================
";
assert_eq!(
displayed,
format!(
"{}",
set_one_db_error_message(set_one_node(set_one_time(history)))
)
);

// Prepared queries retain the history listener set in Query.
let prepared = session.prepare(query).await.unwrap();
session.execute_unpaged(&prepared, ()).await.unwrap();

let history2: StructuredHistory = history_collector.clone_structured_history();

let displayed2 = "Queries History:
=== Query #0 ===
| start_time: 2022-02-22 20:22:22 UTC
| Non-speculative attempts:
| - Attempt #0 sent to 127.0.0.1:19042
| request send time: 2022-02-22 20:22:22 UTC
| Success at 2022-02-22 20:22:22 UTC
|
| Query successful at 2022-02-22 20:22:22 UTC
=================
=== Query #1 ===
| start_time: 2022-02-22 20:22:22 UTC
| Non-speculative attempts:
| - Attempt #0 sent to 127.0.0.1:19042
| request send time: 2022-02-22 20:22:22 UTC
| Success at 2022-02-22 20:22:22 UTC
|
| Query successful at 2022-02-22 20:22:22 UTC
=================
";
assert_eq!(
displayed2,
format!(
"{}",
set_one_db_error_message(set_one_node(set_one_time(history2)))
)
);
}

#[tokio::test]
async fn failed_query_history() {
setup_tracing();
let session = create_new_session_builder().build().await.unwrap();

let mut query = Query::new("This isnt even CQL");
let history_collector = Arc::new(HistoryCollector::new());
query.set_history_listener(history_collector.clone());

assert!(session.query_unpaged(query.clone(), ()).await.is_err());

let history: StructuredHistory = history_collector.clone_structured_history();

let displayed =
"Queries History:
=== Query #0 ===
| start_time: 2022-02-22 20:22:22 UTC
| Non-speculative attempts:
| - Attempt #0 sent to 127.0.0.1:19042
| request send time: 2022-02-22 20:22:22 UTC
| Error at 2022-02-22 20:22:22 UTC
| Error: Database returned an error: The submitted query has a syntax error, Error message: Error message from database
| Retry decision: DontRetry
|
| Query failed at 2022-02-22 20:22:22 UTC
| Error: Database returned an error: The submitted query has a syntax error, Error message: Error message from database
=================
";
assert_eq!(
displayed,
format!(
"{}",
set_one_db_error_message(set_one_node(set_one_time(history)))
)
);
}

#[tokio::test]
async fn iterator_query_history() {
setup_tracing();
let session = create_new_session_builder().build().await.unwrap();
let ks = unique_keyspace_name();
session
.query_unpaged(format!("CREATE KEYSPACE {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[])
.await
.unwrap();
session.use_keyspace(ks, true).await.unwrap();

session
.query_unpaged("CREATE TABLE t (p int primary key)", ())
.await
.unwrap();
for i in 0..32 {
session
.query_unpaged("INSERT INTO t (p) VALUES (?)", (i,))
.await
.unwrap();
}

let mut iter_query: Query = Query::new("SELECT * FROM t");
iter_query.set_page_size(8);
let history_collector = Arc::new(HistoryCollector::new());
iter_query.set_history_listener(history_collector.clone());

let mut rows_iterator = session
.query_iter(iter_query, ())
.await
.unwrap()
.rows_stream::<Row>()
.unwrap();
while let Some(_row) = rows_iterator.next().await {
// Receive rows...
}

let history = history_collector.clone_structured_history();

assert!(history.queries.len() >= 4);

let displayed_prefix = "Queries History:
=== Query #0 ===
| start_time: 2022-02-22 20:22:22 UTC
| Non-speculative attempts:
| - Attempt #0 sent to 127.0.0.1:19042
| request send time: 2022-02-22 20:22:22 UTC
| Success at 2022-02-22 20:22:22 UTC
|
| Query successful at 2022-02-22 20:22:22 UTC
=================
=== Query #1 ===
| start_time: 2022-02-22 20:22:22 UTC
| Non-speculative attempts:
| - Attempt #0 sent to 127.0.0.1:19042
| request send time: 2022-02-22 20:22:22 UTC
| Success at 2022-02-22 20:22:22 UTC
|
| Query successful at 2022-02-22 20:22:22 UTC
=================
=== Query #2 ===
| start_time: 2022-02-22 20:22:22 UTC
| Non-speculative attempts:
| - Attempt #0 sent to 127.0.0.1:19042
| request send time: 2022-02-22 20:22:22 UTC
| Success at 2022-02-22 20:22:22 UTC
|
| Query successful at 2022-02-22 20:22:22 UTC
=================
=== Query #3 ===
| start_time: 2022-02-22 20:22:22 UTC
| Non-speculative attempts:
| - Attempt #0 sent to 127.0.0.1:19042
| request send time: 2022-02-22 20:22:22 UTC
| Success at 2022-02-22 20:22:22 UTC
|
| Query successful at 2022-02-22 20:22:22 UTC
=================
";
let displayed_str = format!(
"{}",
set_one_db_error_message(set_one_node(set_one_time(history)))
);

assert!(displayed_str.starts_with(displayed_prefix),);
}
}
6 changes: 2 additions & 4 deletions scylla/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,8 @@ pub mod transport;

pub(crate) mod utils;

/// This module is NOT part of the public API (it is `pub` only for internal use of integration tests).
/// Future minor releases are free to introduce breaking API changes inside it.
#[doc(hidden)]
pub use utils::test_utils;
#[cfg(test)]
pub(crate) use utils::test_utils;

pub use statement::batch;
pub use statement::prepared_statement;
Expand Down
25 changes: 1 addition & 24 deletions scylla/src/transport/load_balancing/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3128,7 +3128,7 @@ mod latency_awareness {
use crate::{
load_balancing::default::NodeLocationPreference,
routing::Shard,
test_utils::{create_new_session_builder, setup_tracing},
test_utils::setup_tracing,
transport::locator::test::{TABLE_INVALID, TABLE_NTS_RF_2, TABLE_NTS_RF_3},
};
use crate::{
Expand All @@ -3141,7 +3141,6 @@ mod latency_awareness {
locator::test::{id_to_invalid_addr, A, B, C, D, E, F, G},
ClusterData, NodeAddr,
},
ExecutionProfile,
};
use tokio::time::Instant;

Expand Down Expand Up @@ -3847,28 +3846,6 @@ mod latency_awareness {
}
}

// This is a regression test for #696.
#[tokio::test]
#[ntest::timeout(1000)]
async fn latency_aware_query_completes() {
setup_tracing();
let policy = DefaultPolicy::builder()
.latency_awareness(LatencyAwarenessBuilder::default())
.build();
let handle = ExecutionProfile::builder()
.load_balancing_policy(policy)
.build()
.into_handle();

let session = create_new_session_builder()
.default_execution_profile_handle(handle)
.build()
.await
.unwrap();

session.query_unpaged("whatever", ()).await.unwrap_err();
}

#[tokio::test(start_paused = true)]
async fn timestamped_average_works_when_clock_stops() {
setup_tracing();
Expand Down
13 changes: 0 additions & 13 deletions scylla/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,8 @@ pub use connection::SelfIdentity;
pub use execution_profile::ExecutionProfile;
pub use scylla_cql::frame::request::query::{PagingState, PagingStateResponse};

#[cfg(test)]
mod authenticate_test;
#[cfg(test)]
mod cql_collections_test;
#[cfg(test)]
mod session_test;
#[cfg(test)]
mod silent_prepare_batch_test;

#[cfg(test)]
mod cql_types_test;
#[cfg(test)]
mod cql_value_test;
#[cfg(test)]
mod large_batch_statements_test;

pub use cluster::ClusterData;
pub use node::{KnownNode, Node, NodeAddr, NodeRef};
4 changes: 3 additions & 1 deletion scylla/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pub(crate) mod parse;

pub(crate) mod pretty;
pub mod test_utils;

#[cfg(test)]
pub(crate) mod test_utils;
Loading

0 comments on commit e174246

Please sign in to comment.