Skip to content

Commit

Permalink
cassandra 5.0 vector type CREATE/INSERT support
Browse files Browse the repository at this point in the history
makes progress towards: #1014

The vector type is introduced by the currently in beta cassandra 5.
See: https://cassandra.apache.org/doc/latest/cassandra/reference/vector-data-type.html
Scylla does not support vector types and so the tests are setup to only
compile/run with a new cassandra_tests config.

This commit does not add support for retrieving the data via a SELECT.
That was omitted to reduce scope and will be implemented in follow up
work.
  • Loading branch information
rukai committed Jun 25, 2024
1 parent c8db77b commit 10fa32c
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cassandra.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
run: cargo build --verbose --tests --features "full-serialization"
- name: Run tests on cassandra
run: |
CDC='disabled' RUST_LOG=trace SCYLLA_URI=172.42.0.2:9042 SCYLLA_URI2=172.42.0.3:9042 SCYLLA_URI3=172.42.0.4:9042 cargo test --verbose --features "full-serialization" -- --skip test_views_in_schema_info --skip test_large_batch_statements
CDC='disabled' RUSTFLAGS="--cfg cassandra_tests" RUST_LOG=trace SCYLLA_URI=172.42.0.2:9042 SCYLLA_URI2=172.42.0.3:9042 SCYLLA_URI3=172.42.0.4:9042 cargo test --verbose --features "full-serialization" -- --skip test_views_in_schema_info --skip test_large_batch_statements
- name: Stop the cluster
if: ${{ always() }}
run: docker compose -f test/cluster/cassandra/docker-compose.yml stop
Expand Down
42 changes: 42 additions & 0 deletions scylla/src/transport/session_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2885,3 +2885,45 @@ async fn test_manual_primary_key_computation() {
.await;
}
}

#[cfg(cassandra_tests)]
#[tokio::test]
async fn test_vector_type() {
setup_tracing();
let session = create_new_session_builder().build().await.unwrap();
let ks = unique_keyspace_name();

session.query(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap();
session
.query(
format!(
"CREATE TABLE IF NOT EXISTS {}.t (a int PRIMARY KEY, b vector<int, 4>, c vector<text, 2>)",
ks
),
&[],
)
.await
.unwrap();

session
.query(
format!(
"INSERT INTO {}.t (a, b, c) VALUES (1, [1, 2, 3, 4], ['foo', 'bar'])",
ks
),
&[],
)
.await
.unwrap();

let prepared_statement = session
.prepare(format!(
"INSERT INTO {}.t (a, b, c) VALUES (2, [11, 12, 13, 14], ['afoo', 'abar'])",
ks
))
.await
.unwrap();
session.execute(&prepared_statement, &[]).await.unwrap();

// TODO: Implement and test SELECT statements and bind values (`?`)
}
53 changes: 53 additions & 0 deletions scylla/src/transport/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ enum PreCqlType {
type_: PreCollectionType,
},
Tuple(Vec<PreCqlType>),
Vector {
type_: Box<PreCqlType>,
/// as per https://cassandra.apache.org/doc/latest/cassandra/reference/vector-data-type.html
/// vectors are limited to a size of 8192
size: u16,
},
UserDefinedType {
frozen: bool,
name: String,
Expand All @@ -207,6 +213,10 @@ impl PreCqlType {
.map(|t| t.into_cql_type(keyspace_name, udts))
.collect(),
),
PreCqlType::Vector { type_, size } => CqlType::Vector {
type_: Box::new(type_.into_cql_type(keyspace_name, udts)),
size,
},
PreCqlType::UserDefinedType { frozen, name } => {
let definition = match udts
.get(keyspace_name)
Expand All @@ -232,6 +242,12 @@ pub enum CqlType {
type_: CollectionType,
},
Tuple(Vec<CqlType>),
Vector {
type_: Box<CqlType>,
/// as per https://cassandra.apache.org/doc/latest/cassandra/reference/vector-data-type.html
/// vectors are limited to a size of 8192
size: u16,
},
UserDefinedType {
frozen: bool,
// Using Arc here in order not to have many copies of the same definition
Expand Down Expand Up @@ -1093,6 +1109,7 @@ fn topo_sort_udts(udts: &mut Vec<UdtRowWithParsedFieldTypes>) -> Result<(), Quer
PreCqlType::Tuple(types) => types
.iter()
.for_each(|type_| do_with_referenced_udts(what, type_)),
PreCqlType::Vector { type_, .. } => do_with_referenced_udts(what, type_),
PreCqlType::UserDefinedType { name, .. } => what(name),
}
}
Expand Down Expand Up @@ -1602,6 +1619,28 @@ fn parse_cql_type(p: ParserState<'_>) -> ParseResult<(PreCqlType, ParserState<'_
})?;

Ok((PreCqlType::Tuple(types), p))
} else if let Ok(p) = p.accept("vector<") {
let (inner_type, p) = parse_cql_type(p)?;

let p = p.skip_white();
let p = p.accept(",")?;
let p = p.skip_white();

let (size, p) = p.take_while(|c| c.is_numeric());
let size = size.parse().map_err(|_| {
p.error(ParseErrorCause::Other(
"Expected integer but found non-integer character",
))
})?;

let p = p.accept(">")?;

let typ = PreCqlType::Vector {
type_: Box::new(inner_type),
size,
};

Ok((typ, p))
} else if let Ok((typ, p)) = parse_native_type(p) {
Ok((PreCqlType::Native(typ), p))
} else if let Ok((name, p)) = parse_user_defined_type(p) {
Expand Down Expand Up @@ -1787,6 +1826,20 @@ mod tests {
PreCqlType::Native(NativeType::Varint),
]),
),
(
"vector<int, 5>",
PreCqlType::Vector {
type_: Box::new(PreCqlType::Native(NativeType::Int)),
size: 5,
},
),
(
"vector<text, 1234>",
PreCqlType::Vector {
type_: Box::new(PreCqlType::Native(NativeType::Text)),
size: 1234,
},
),
(
"com.scylladb.types.AwesomeType",
PreCqlType::UserDefinedType {
Expand Down
30 changes: 15 additions & 15 deletions test/cluster/cassandra/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ networks:
- subnet: 172.42.0.0/16
services:
cassandra1:
image: cassandra
image: cassandra:5.0-beta1
healthcheck:
test: ["CMD", "cqlsh", "-e", "describe keyspaces" ]
interval: 5s
timeout: 5s
retries: 60
test: [ "CMD", "cqlsh", "-e", "describe keyspaces" ]
interval: 5s
timeout: 5s
retries: 60
networks:
public:
ipv4_address: 172.42.0.2
Expand All @@ -24,12 +24,12 @@ services:
- HEAP_NEWSIZE=512M
- MAX_HEAP_SIZE=2048M
cassandra2:
image: cassandra
image: cassandra:5.0-beta1
healthcheck:
test: ["CMD", "cqlsh", "-e", "describe keyspaces" ]
interval: 5s
timeout: 5s
retries: 60
test: [ "CMD", "cqlsh", "-e", "describe keyspaces" ]
interval: 5s
timeout: 5s
retries: 60
networks:
public:
ipv4_address: 172.42.0.3
Expand All @@ -42,12 +42,12 @@ services:
cassandra1:
condition: service_healthy
cassandra3:
image: cassandra
image: cassandra:5.0-beta1
healthcheck:
test: ["CMD", "cqlsh", "-e", "describe keyspaces" ]
interval: 5s
timeout: 5s
retries: 60
test: [ "CMD", "cqlsh", "-e", "describe keyspaces" ]
interval: 5s
timeout: 5s
retries: 60
networks:
public:
ipv4_address: 172.42.0.4
Expand Down

0 comments on commit 10fa32c

Please sign in to comment.