From 3fe13046b2480bee5a46018449114753ad2fe040 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Mon, 24 Jun 2024 14:58:46 +1000 Subject: [PATCH] cassandra 5.0 vector type CREATE/INSERT support makes progress towards: https://github.com/scylladb/scylla-rust-driver/issues/1014 The vector type is introduced by the currently in beta cassandra 5. See: https://cassandra.apache.org/doc/latest/cassandra/reference/vector-data-type.html Scylla does not support vector types and so the tests are setup to only compile/run with a new cassandra_tests config. This commit does not add support for retrieving the data via a SELECT. That was omitted to reduce scope and will be implemented in follow up work. --- .github/workflows/cassandra.yml | 16 ++++++ scylla/src/transport/session_test.rs | 58 ++++++++++++++++++++ scylla/src/transport/topology.rs | 47 ++++++++++++++++ scylla/src/utils/parse.rs | 15 ++++++ test/cluster/cassandra5/docker-compose.yml | 63 ++++++++++++++++++++++ 5 files changed, 199 insertions(+) create mode 100644 test/cluster/cassandra5/docker-compose.yml diff --git a/.github/workflows/cassandra.yml b/.github/workflows/cassandra.yml index 4926ece5d6..84436421ab 100644 --- a/.github/workflows/cassandra.yml +++ b/.github/workflows/cassandra.yml @@ -40,3 +40,19 @@ jobs: run: docker compose -f test/cluster/cassandra/docker-compose.yml logs - name: Remove cluster run: docker compose -f test/cluster/cassandra/docker-compose.yml down + + # TODO: delete the below and move RUSTFLAGS into the above test run when cassandra 5.0.0 releases. + - name: Setup 3-node Cassandra cluster + run: | + docker compose -f test/cluster/cassandra5/docker-compose.yml up -d --wait + - name: Run tests on cassandra 5 beta + run: | + CDC='disabled' RUSTFLAGS="--cfg cassandra_tests" RUST_LOG=trace SCYLLA_URI=172.42.0.2:9042 SCYLLA_URI2=172.42.0.3:9042 SCYLLA_URI3=172.42.0.4:9042 cargo test --verbose --features "full-serialization" -- --skip test_views_in_schema_info --skip test_large_batch_statements + - name: Stop the cluster + if: ${{ always() }} + run: docker compose -f test/cluster/cassandra5/docker-compose.yml stop + - name: Print the cluster logs + if: ${{ always() }} + run: docker compose -f test/cluster/cassandra5/docker-compose.yml logs + - name: Remove cluster + run: docker compose -f test/cluster/cassandra5/docker-compose.yml down diff --git a/scylla/src/transport/session_test.rs b/scylla/src/transport/session_test.rs index ab891c72f8..bda3ed86f3 100644 --- a/scylla/src/transport/session_test.rs +++ b/scylla/src/transport/session_test.rs @@ -2885,3 +2885,61 @@ async fn test_manual_primary_key_computation() { .await; } } + +#[cfg(cassandra_tests)] +#[tokio::test] +async fn test_vector_type() { + setup_tracing(); + let session = create_new_session_builder().build().await.unwrap(); + let ks = unique_keyspace_name(); + + session.query(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap(); + session + .query( + format!( + "CREATE TABLE IF NOT EXISTS {}.t (a int PRIMARY KEY, b vector, c vector)", + ks + ), + &[], + ) + .await + .unwrap(); + + session + .query( + format!( + "INSERT INTO {}.t (a, b, c) VALUES (1, [1, 2, 3, 4], ['foo', 'bar'])", + ks + ), + &[], + ) + .await + .unwrap(); + + let prepared_statement = session + .prepare(format!( + "INSERT INTO {}.t (a, b, c) VALUES (?, [11, 12, 13, 14], ['afoo', 'abar'])", + ks + )) + .await + .unwrap(); + session.execute(&prepared_statement, &(2,)).await.unwrap(); + let metadata = session.get_cluster_data(); + let columns = &metadata.keyspaces[&ks].tables["t"].columns; + assert_eq!( + columns["b"].type_, + CqlType::Vector { + type_: Box::new(CqlType::Native(NativeType::Int)), + dimensions: 4, + }, + ); + assert_eq!( + columns["c"].type_, + CqlType::Vector { + type_: Box::new(CqlType::Native(NativeType::Text)), + dimensions: 2, + }, + ); + + // TODO: Implement and test SELECT statements and bind values (`?`) +} diff --git a/scylla/src/transport/topology.rs b/scylla/src/transport/topology.rs index b468050c0b..696bbe0c1a 100644 --- a/scylla/src/transport/topology.rs +++ b/scylla/src/transport/topology.rs @@ -184,6 +184,12 @@ enum PreCqlType { type_: PreCollectionType, }, Tuple(Vec), + Vector { + type_: Box, + /// matches the datatype used by the java driver: + /// + dimensions: i32, + }, UserDefinedType { frozen: bool, name: String, @@ -207,6 +213,10 @@ impl PreCqlType { .map(|t| t.into_cql_type(keyspace_name, udts)) .collect(), ), + PreCqlType::Vector { type_, dimensions } => CqlType::Vector { + type_: Box::new(type_.into_cql_type(keyspace_name, udts)), + dimensions, + }, PreCqlType::UserDefinedType { frozen, name } => { let definition = match udts .get(keyspace_name) @@ -232,6 +242,12 @@ pub enum CqlType { type_: CollectionType, }, Tuple(Vec), + Vector { + type_: Box, + /// matches the datatype used by the java driver: + /// + dimensions: i32, + }, UserDefinedType { frozen: bool, // Using Arc here in order not to have many copies of the same definition @@ -1093,6 +1109,7 @@ fn topo_sort_udts(udts: &mut Vec) -> Result<(), Quer PreCqlType::Tuple(types) => types .iter() .for_each(|type_| do_with_referenced_udts(what, type_)), + PreCqlType::Vector { type_, .. } => do_with_referenced_udts(what, type_), PreCqlType::UserDefinedType { name, .. } => what(name), } } @@ -1602,6 +1619,22 @@ fn parse_cql_type(p: ParserState<'_>) -> ParseResult<(PreCqlType, ParserState<'_ })?; Ok((PreCqlType::Tuple(types), p)) + } else if let Ok(p) = p.accept("vector<") { + let (inner_type, p) = parse_cql_type(p)?; + + let p = p.skip_white(); + let p = p.accept(",")?; + let p = p.skip_white(); + let (size, p) = p.parse_i32()?; + let p = p.skip_white(); + let p = p.accept(">")?; + + let typ = PreCqlType::Vector { + type_: Box::new(inner_type), + dimensions: size, + }; + + Ok((typ, p)) } else if let Ok((typ, p)) = parse_native_type(p) { Ok((PreCqlType::Native(typ), p)) } else if let Ok((name, p)) = parse_user_defined_type(p) { @@ -1787,6 +1820,20 @@ mod tests { PreCqlType::Native(NativeType::Varint), ]), ), + ( + "vector", + PreCqlType::Vector { + type_: Box::new(PreCqlType::Native(NativeType::Int)), + dimensions: 5, + }, + ), + ( + "vector", + PreCqlType::Vector { + type_: Box::new(PreCqlType::Native(NativeType::Text)), + dimensions: 1234, + }, + ), ( "com.scylladb.types.AwesomeType", PreCqlType::UserDefinedType { diff --git a/scylla/src/utils/parse.rs b/scylla/src/utils/parse.rs index 1c5e59ecb7..96aa7976d7 100644 --- a/scylla/src/utils/parse.rs +++ b/scylla/src/utils/parse.rs @@ -87,6 +87,21 @@ impl<'s> ParserState<'s> { me } + /// Parses a sequence of digits and '-' as an integer. + /// Consumes characters until it finds a character that is not a digit or '-'. + /// + /// An error is returned if: + /// * The first character is not a digit or '-' + /// * The integer is larger than i32 + pub(crate) fn parse_i32(self) -> ParseResult<(i32, Self)> { + let (digits, p) = self.take_while(|c| c.is_ascii_digit() || c == '-'); + if let Ok(value) = digits.parse() { + Ok((value, p)) + } else { + Err(p.error(ParseErrorCause::Expected("integer of max length 2**32"))) + } + } + /// Skips characters from the beginning while they satisfy given predicate /// and returns new parser state which pub(crate) fn take_while(self, mut pred: impl FnMut(char) -> bool) -> (&'s str, Self) { diff --git a/test/cluster/cassandra5/docker-compose.yml b/test/cluster/cassandra5/docker-compose.yml new file mode 100644 index 0000000000..2c60afc3b2 --- /dev/null +++ b/test/cluster/cassandra5/docker-compose.yml @@ -0,0 +1,63 @@ +# TODO: when cassandra 5.0.0 releases, remove this file and use cluster/cassandra/docker-compose.yml instead + +version: '2.4' # 2.4 is the last version that supports depends_on conditions for service health + +networks: + public: + name: scylla_rust_driver_public + driver: bridge + ipam: + driver: default + config: + - subnet: 172.42.0.0/16 +services: + cassandra1: + image: cassandra:5.0-beta1 + healthcheck: + test: [ "CMD", "cqlsh", "-e", "describe keyspaces" ] + interval: 5s + timeout: 5s + retries: 60 + networks: + public: + ipv4_address: 172.42.0.2 + environment: + - CASSANDRA_BROADCAST_ADDRESS=172.42.0.2 + - HEAP_NEWSIZE=512M + - MAX_HEAP_SIZE=2048M + cassandra2: + image: cassandra:5.0-beta1 + healthcheck: + test: [ "CMD", "cqlsh", "-e", "describe keyspaces" ] + interval: 5s + timeout: 5s + retries: 60 + networks: + public: + ipv4_address: 172.42.0.3 + environment: + - CASSANDRA_BROADCAST_ADDRESS=172.42.0.3 + - CASSANDRA_SEEDS=172.42.0.2 + - HEAP_NEWSIZE=512M + - MAX_HEAP_SIZE=2048M + depends_on: + cassandra1: + condition: service_healthy + cassandra3: + image: cassandra:5.0-beta1 + healthcheck: + test: [ "CMD", "cqlsh", "-e", "describe keyspaces" ] + interval: 5s + timeout: 5s + retries: 60 + networks: + public: + ipv4_address: 172.42.0.4 + environment: + - CASSANDRA_BROADCAST_ADDRESS=172.42.0.4 + - CASSANDRA_SEEDS=172.42.0.2,172.42.0.3 + - HEAP_NEWSIZE=512M + - MAX_HEAP_SIZE=2048M + depends_on: + cassandra2: + condition: service_healthy