Skip to content

Commit

Permalink
SPARKC-227: Fix Connection Caching, Change SSL EnabledAlgorithms to Set
Browse files Browse the repository at this point in the history
The bug here is that an Array(enabledAlgorithms) will not match another
Array even if their contents match exactly. This will happen even if the
array is declared statically and finally. To fix this we need to change
the enabledAlgorithms variable to a Set which is more appropriate for
this field as well (Since order and duplicates do not matter). The
original decision to make this an Array was most likely done to match
the Java Driver's implementation, unfortunately this has some bad
implications for the underlying connection pool which we can avoid with
a Set.

Since the Hash of the CassandraConnectionConf is used by
the Cache to determine whether or not a new Cluster object needs to be
made, using an Array forces the creation of a new Cluster object on
every write. These cannot be reused. Changing this to a set allows the
cache to recognize that two CassandraConnectionConfs which contain SSL
EnabledAlgorithms which have the same contents are the same.
  • Loading branch information
RussellSpitzer authored and pkolaczk committed Aug 7, 2015
1 parent 9295a4b commit 940d581
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import org.apache.spark.SparkConf
import com.datastax.driver.core.ProtocolOptions
import com.datastax.spark.connector.SparkCassandraITFlatSpecBase
import com.datastax.spark.connector.embedded._
import com.datastax.spark.connector._

case class KeyValue(key: Int, group: Long, value: String)
case class KeyValueWithConversion(key: String, group: Int, value: Long)
Expand Down Expand Up @@ -91,6 +92,27 @@ class CassandraConnectorSpec extends SparkCassandraITFlatSpecBase {
session2.isClosed shouldEqual true
}

it should "cache session objects for reuse" in {
CassandraConnector(sc.getConf).withSessionDo(x => {})
CassandraConnector(sc.getConf).withSessionDo(x => {})
val sessionCache = CassandraConnector.sessionCache
sessionCache.contains(CassandraConnectorConf(sc.getConf)) should be (true)
sessionCache.cache.size should be (1)
}

it should "not make multiple clusters when writing multiple RDDs" in {
CassandraConnector(sc.getConf).withSessionDo{ session =>
session.execute("""CREATE KEYSPACE IF NOT EXISTS test WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': 1 }""")
session.execute("""CREATE TABLE IF NOT EXISTS test.pair (x int, y int, PRIMARY KEY (x))""")
}
for (trial <- 1 to 3){
val rdd = sc.parallelize(1 to 100).map(x=> (x,x)).saveToCassandra("test","pair")
}
val sessionCache = CassandraConnector.sessionCache
sessionCache.contains(CassandraConnectorConf(sc.getConf)) should be (true)
sessionCache.cache.size should be (1)
}

it should "be configurable from SparkConf" in {
val host = EmbeddedCassandra.getHost(0).getHostAddress
val conf = defaultConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class CassandraSSLConnectorSpec extends SparkCassandraITFlatSpecBase {
enabled = true,
trustStorePath = Some(ClassLoader.getSystemResource("truststore").getPath),
trustStorePassword = Some("connector"),
enabledAlgorithms = Array("TLS_RSA_WITH_AES_128_CBC_SHA")))
enabledAlgorithms = Set("TLS_RSA_WITH_AES_128_CBC_SHA")))

// Wait for the default user to be created in Cassandra.
Thread.sleep(1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ object DefaultConnectionFactory extends CassandraConnectionFactory {

val context = SSLContext.getInstance(conf.protocol)
context.init(null, tmf.getTrustManagers, new SecureRandom)
new SSLOptions(context, conf.enabledAlgorithms)
new SSLOptions(context, conf.enabledAlgorithms.toArray)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ object CassandraConnector extends Logging {

val keepAliveMillis = System.getProperty("spark.cassandra.connection.keep_alive_ms", "250").toInt

private val sessionCache = new RefCountedCache[CassandraConnectorConf, Session](
private[cql] val sessionCache = new RefCountedCache[CassandraConnectorConf, Session](
createSession, destroySession, alternativeConnectionConfigs)

private def createSession(conf: CassandraConnectorConf): Session = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object CassandraConnectorConf extends Logging {
trustStorePassword: Option[String] = None,
trustStoreType: String = "JKS",
protocol: String = "TLS",
enabledAlgorithms: Array[String] = SSLOptions.DEFAULT_SSL_CIPHER_SUITES
enabledAlgorithms: Set[String] = SSLOptions.DEFAULT_SSL_CIPHER_SUITES.toSet
)

trait RetryDelayConf {
Expand Down Expand Up @@ -198,7 +198,7 @@ object CassandraConnectorConf extends Logging {
val sslProtocol = conf.get(CassandraConnectionSSLProtocolProperty,
defaultValue = DefaultCassandraSSLConf.protocol)
val sslEnabledAlgorithms = conf.getOption(CassandraConnectionSSLEnabledAlgorithmsProperty)
.map(_.split(",").map(_.trim)).getOrElse(DefaultCassandraSSLConf.enabledAlgorithms)
.map(_.split(",").map(_.trim).toSet).getOrElse(DefaultCassandraSSLConf.enabledAlgorithms)

val cassandraSSLConf = CassandraSSLConf(
enabled = sslEnabled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ final class RefCountedCache[K, V](create: K => V,
destroy: V => Any,
keys: (K, V) => Set[K] = (_: K, _: V) => Set.empty[K]) {

private case class ReleaseTask(value: V, count: Int, scheduledTime: Long) extends Runnable {
private[cql] case class ReleaseTask(value: V, count: Int, scheduledTime: Long) extends Runnable {
def run() {
releaseImmediately(value, count)
}
}

private val refCounter = new RefCountMap[V]
private val cache = new TrieMap[K, V]
private val valuesToKeys = new TrieMap[V, Set[K]]
private val deferredReleases = new TrieMap[V, ReleaseTask]
private[cql] val refCounter = new RefCountMap[V]
private[cql] val cache = new TrieMap[K, V]
private[cql] val valuesToKeys = new TrieMap[V, Set[K]]
private[cql] val deferredReleases = new TrieMap[V, ReleaseTask]

private def createNewValueAndKeys(key: K): (V, Set[K]) = {
val value = create(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ class CassandraConnectorConfSpec extends FlatSpec with Matchers {
SerializationUtils.roundtrip(conf)
}

it should "match a conf with the same settings" in {
val conf_a = CassandraConnectorConf(new SparkConf)
val conf_1 = CassandraConnectorConf(new SparkConf)

conf_a should equal (conf_1)
}

it should "resolve default SSL settings correctly" in {
val sparkConf = new SparkConf(loadDefaults = false)

Expand Down

0 comments on commit 940d581

Please sign in to comment.