diff --git a/docs/configs/janusgraph-cfg.md b/docs/configs/janusgraph-cfg.md index e8d07a7043..5ab0838965 100644 --- a/docs/configs/janusgraph-cfg.md +++ b/docs/configs/janusgraph-cfg.md @@ -437,7 +437,7 @@ CQL storage backend options | storage.cql.compaction-strategy-options | Compaction strategy options. This list is interpreted as a map. It must have an even number of elements in [key,val,key,val,...] form. | String[] | (no default value) | FIXED | | storage.cql.compression | Whether the storage backend should use compression when storing the data | Boolean | true | FIXED | | storage.cql.compression-block-size | The size of the compression blocks in kilobytes | Integer | 64 | FIXED | -| storage.cql.compression-type | The sstable_compression value JanusGraph uses when creating column families. This accepts any value allowed by Cassandra's sstable_compression option. Leave this unset to disable sstable_compression on JanusGraph-created CFs. | String | LZ4Compressor | MASKABLE | +| storage.cql.compression-type | The compression class value JanusGraph uses when creating column families. This accepts any value allowed by Cassandra's compression class option. Leave this unset to disable compression on JanusGraph-created CFs. | String | LZ4Compressor | MASKABLE | | storage.cql.gc-grace-seconds | The number of seconds before tombstones (deletion markers) are eligible for garbage-collection. | Integer | (no default value) | FIXED | | storage.cql.heartbeat-interval | The connection heartbeat interval in milliseconds. | Long | (no default value) | MASKABLE | | storage.cql.heartbeat-timeout | How long the driver waits for the response (in milliseconds) to a heartbeat. | Long | (no default value) | MASKABLE | diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLConfigOptions.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLConfigOptions.java index 9d6f00c57a..c64566b5a6 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLConfigOptions.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLConfigOptions.java @@ -178,9 +178,9 @@ public interface CQLConfigOptions { ConfigOption CF_COMPRESSION_TYPE = new ConfigOption<>( CQL_NS, "compression-type", - "The sstable_compression value JanusGraph uses when creating column families. " + - "This accepts any value allowed by Cassandra's sstable_compression option. " + - "Leave this unset to disable sstable_compression on JanusGraph-created CFs.", + "The compression class value JanusGraph uses when creating column families. " + + "This accepts any value allowed by Cassandra's compression class option. " + + "Leave this unset to disable compression on JanusGraph-created CFs.", ConfigOption.Type.MASKABLE, "LZ4Compressor"); diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java index 247e4e838a..b01136a176 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java @@ -21,6 +21,7 @@ import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.metadata.TokenMap; import com.datastax.oss.driver.api.core.servererrors.QueryValidationException; @@ -305,7 +306,19 @@ private boolean shouldInitializeTable() { .orElse(true); } + private static int getCassandraMajorVersion(final CqlSession session) { + try { + ResultSet rs = session.execute("SELECT release_version FROM system.local"); + Row row = rs.one(); + String version = row.getString("release_version"); + return Integer.parseInt(version.split("\\.")[0]); + } catch (final Exception e) { + return 0; + } + } + private static void initializeTable(final CqlSession session, final String keyspaceName, final String tableName, final Configuration configuration) { + int cassandraMajorVersion = getCassandraMajorVersion(session); CreateTableWithOptions createTable = createTable(keyspaceName, tableName) .ifNotExists() .withPartitionKey(KEY_COLUMN_NAME, DataTypes.BLOB) @@ -313,7 +326,7 @@ private static void initializeTable(final CqlSession session, final String keysp .withColumn(VALUE_COLUMN_NAME, DataTypes.BLOB); createTable = compactionOptions(createTable, configuration); - createTable = compressionOptions(createTable, configuration); + createTable = compressionOptions(createTable, configuration, cassandraMajorVersion); createTable = gcGraceSeconds(createTable, configuration); createTable = speculativeRetryOptions(createTable, configuration); @@ -321,7 +334,8 @@ private static void initializeTable(final CqlSession session, final String keysp } private static CreateTableWithOptions compressionOptions(final CreateTableWithOptions createTable, - final Configuration configuration) { + final Configuration configuration, + final int cassandraMajorVersion) { if (!configuration.get(CF_COMPRESSION)) { // No compression return createTable.withNoCompression(); @@ -329,9 +343,14 @@ private static CreateTableWithOptions compressionOptions(final CreateTableWithOp String compressionType = configuration.get(CF_COMPRESSION_TYPE); int chunkLengthInKb = configuration.get(CF_COMPRESSION_BLOCK_SIZE); + Map options; + + if (cassandraMajorVersion >= 5) + options = ImmutableMap.of("class", compressionType, "chunk_length_in_kb", chunkLengthInKb); + else + options = ImmutableMap.of("sstable_compression", compressionType, "chunk_length_kb", chunkLengthInKb); - return createTable.withOption("compression", - ImmutableMap.of("sstable_compression", compressionType, "chunk_length_kb", chunkLengthInKb)); + return createTable.withOption("compression", options); } static CreateTableWithOptions compactionOptions(final CreateTableWithOptions createTable,