Skip to content

Commit

Permalink
Merge pull request #37 from sky-uk/cluster_health_check_when_schema_c…
Browse files Browse the repository at this point in the history
…hanges

Don't check cluster health when there are no schema changes.
  • Loading branch information
adrianmkng authored Sep 1, 2016
2 parents 3fed78f + d080356 commit a192a7a
Show file tree
Hide file tree
Showing 14 changed files with 264 additions and 24 deletions.
3 changes: 3 additions & 0 deletions src/main/java/uk/sky/cqlmigrate/CqlLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ class CqlLoader {
private CqlLoader() {}

static void load(SessionContext sessionContext, List<String> cqlStatements) {
if (!cqlStatements.isEmpty()) {
sessionContext.checkClusterHealth();
}
try {
cqlStatements.stream()
.map(stringStatement -> new SimpleStatement(stringStatement).setConsistencyLevel(sessionContext.getWriteConsistencyLevel()))
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/uk/sky/cqlmigrate/CqlMigratorFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ public static CqlMigrator create(CassandraLockConfig lockConfig) {
* @since 0.9.4
*/
public static CqlMigrator create(CqlMigratorConfig cqlMigratorConfig) {
return new CqlMigratorImpl(cqlMigratorConfig);
return new CqlMigratorImpl(cqlMigratorConfig, new SessionContextFactory());
}
}
15 changes: 8 additions & 7 deletions src/main/java/uk/sky/cqlmigrate/CqlMigratorImpl.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package uk.sky.cqlmigrate;

import com.datastax.driver.core.*;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -24,9 +27,11 @@ final class CqlMigratorImpl implements CqlMigrator {
private static final Logger LOGGER = LoggerFactory.getLogger(CqlMigratorImpl.class);

private final CqlMigratorConfig cqlMigratorConfig;
private final SessionContextFactory sessionContextFactory;

CqlMigratorImpl(CqlMigratorConfig cqlMigratorConfig) {
CqlMigratorImpl(CqlMigratorConfig cqlMigratorConfig, SessionContextFactory sessionContextFactory) {
this.cqlMigratorConfig = cqlMigratorConfig;
this.sessionContextFactory = sessionContextFactory;
}

/**
Expand Down Expand Up @@ -67,18 +72,14 @@ public void migrate(String[] hosts, int port, String username, String password,
* {@inheritDoc}
*/
public void migrate(Session session, String keyspace, Collection<Path> directories) {
Cluster cluster = session.getCluster();
ClusterHealth clusterHealth = new ClusterHealth(cluster);
clusterHealth.check();

CassandraLockingMechanism cassandraLockingMechanism = new CassandraLockingMechanism(session, keyspace);
Lock lock = new Lock(cassandraLockingMechanism, cqlMigratorConfig.getCassandraLockConfig());
lock.lock();

LOGGER.info("Loading cql files from {}", directories);
CqlPaths paths = CqlPaths.create(directories);

SessionContext sessionContext = new SessionContext(session, cqlMigratorConfig.getReadConsistencyLevel(), cqlMigratorConfig.getWriteConsistencyLevel());
SessionContext sessionContext = sessionContextFactory.getInstance(session, cqlMigratorConfig);

KeyspaceBootstrapper keyspaceBootstrapper = new KeyspaceBootstrapper(sessionContext, keyspace, paths);
SchemaUpdates schemaUpdates = new SchemaUpdates(sessionContext, keyspace);
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/uk/sky/cqlmigrate/SchemaUpdates.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.TableMetadata;
import com.google.common.base.Throwables;
import com.google.common.hash.Hashing;
import com.google.common.io.ByteSource;
Expand All @@ -14,6 +15,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Collections;

import static com.google.common.base.Preconditions.checkNotNull;

Expand All @@ -33,8 +35,10 @@ class SchemaUpdates {
void initialise() {
Session session = sessionContext.getSession();
session.execute(new SimpleStatement("USE " + keyspace + ";").setConsistencyLevel(sessionContext.getReadConsistencyLevel()));
session.execute(new SimpleStatement("CREATE TABLE IF NOT EXISTS " + SCHEMA_UPDATES_TABLE + " (filename text primary key, " + CHECKSUM_COLUMN + " text, applied_on timestamp);")
.setConsistencyLevel(sessionContext.getWriteConsistencyLevel()));
TableMetadata schemaUpdateTableMetadata = session.getCluster().getMetadata().getKeyspace(keyspace).getTable(SCHEMA_UPDATES_TABLE);
if (schemaUpdateTableMetadata == null) {
CqlLoader.load(sessionContext, Collections.singletonList("CREATE TABLE " + SCHEMA_UPDATES_TABLE + " (filename text primary key, " + CHECKSUM_COLUMN + " text, applied_on timestamp);"));
}
}

boolean alreadyApplied(String filename) {
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/uk/sky/cqlmigrate/SessionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ class SessionContext {
private final Session session;
private final ConsistencyLevel readConsistencyLevel;
private final ConsistencyLevel writeConsistencyLevel;
private final ClusterHealth clusterHealth;
private boolean clusterHealthChecked = false;

SessionContext(Session session, ConsistencyLevel readConsistencyLevel, ConsistencyLevel writeConsistencyLevel) {
SessionContext(Session session, ConsistencyLevel readConsistencyLevel, ConsistencyLevel writeConsistencyLevel, ClusterHealth clusterHealth) {
this.session = session;
this.readConsistencyLevel = readConsistencyLevel;
this.writeConsistencyLevel = writeConsistencyLevel;
this.clusterHealth = clusterHealth;
}

public Session getSession() {
Expand All @@ -26,4 +29,11 @@ public ConsistencyLevel getReadConsistencyLevel() {
public ConsistencyLevel getWriteConsistencyLevel() {
return writeConsistencyLevel;
}

public void checkClusterHealth() {
if (!clusterHealthChecked) {
clusterHealth.check();
clusterHealthChecked = true;
}
}
}
10 changes: 10 additions & 0 deletions src/main/java/uk/sky/cqlmigrate/SessionContextFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package uk.sky.cqlmigrate;

import com.datastax.driver.core.Session;

class SessionContextFactory {
SessionContext getInstance(Session session, CqlMigratorConfig cqlMigratorConfig) {
ClusterHealth clusterHealth = new ClusterHealth(session.getCluster());
return new SessionContext(session, cqlMigratorConfig.getReadConsistencyLevel(), cqlMigratorConfig.getWriteConsistencyLevel(), clusterHealth);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import static java.util.Arrays.asList;
import static org.scassandra.http.client.PrimingRequest.preparedStatementBuilder;
import static org.scassandra.http.client.PrimingRequest.queryBuilder;
import static org.scassandra.http.client.PrimingRequest.then;
import static org.scassandra.http.client.types.ColumnMetadata.column;
import static org.scassandra.matchers.Matchers.containsQuery;
Expand Down Expand Up @@ -65,6 +66,14 @@ public void initialiseLockTable() throws ConfigurationException, IOException, TT
.build()
.connect();

primingClient.prime(queryBuilder()
.withQuery("SELECT * FROM system.schema_keyspaces")
.withThen(
then()
.withColumnTypes(column("keyspace_name", PrimitiveType.TEXT), column("durable_writes", PrimitiveType.BOOLEAN), column("strategy_class", PrimitiveType.VARCHAR), column("strategy_options", PrimitiveType.TEXT))
.withRows(ImmutableMap.of("keyspace_name", "cqlmigrate_test", "durable_writes", true, "strategy_class", "org.apache.cassandra.locator.SimpleStrategy", "strategy_options", "{\"replication_factor\":\"1\"}"))
));

primingClient.prime(preparedStatementBuilder()
.withQuery("INSERT INTO cqlmigrate.locks (name, client) VALUES (?, ?) IF NOT EXISTS")
.withThen(
Expand Down Expand Up @@ -137,12 +146,7 @@ public void shouldApplyCorrectCustomisedConsistencyLevelsConfiguredForUnderlying
}

private void assertStatementsExecuteWithExpectedConsistencyLevels(ConsistencyLevel expectedReadConsistencyLevel, ConsistencyLevel expectedWriteConsistencyLevel) {
//ensure bootstrap.cql is applied with configured write consistency level
Assert.assertThat(activityClient.retrieveQueries(), containsQuery(Query
.builder()
.withQuery("CREATE KEYSPACE cqlmigrate_test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
.withConsistency(expectedWriteConsistencyLevel.toString())
.build()));
//bootstrap.cql is already "applied" as we are priming cassandra to pretend it already has the keyspace

//ensure that schema updates are applied at configured consistency level
Assert.assertThat(activityClient.retrieveQueries(), containsQuery(Query
Expand Down Expand Up @@ -175,7 +179,7 @@ private void assertStatementsExecuteWithExpectedConsistencyLevels(ConsistencyLev
//ensure that create schema_updates table is done at the configured consistency level
Assert.assertThat(activityClient.retrieveQueries(), containsQuery(Query
.builder()
.withQuery("CREATE TABLE IF NOT EXISTS schema_updates (filename text primary key, checksum text, applied_on timestamp);")
.withQuery("CREATE TABLE schema_updates (filename text primary key, checksum text, applied_on timestamp);")
.withConsistency(expectedWriteConsistencyLevel.toString())
.build()));
}
Expand Down
152 changes: 152 additions & 0 deletions src/test/java/uk/sky/cqlmigrate/CqlMigratorHealthCheckTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package uk.sky.cqlmigrate;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Session;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.thrift.transport.TTransportException;
import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;

import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.singletonList;
import static org.mockito.Mockito.*;

public class CqlMigratorHealthCheckTest {
private static final String[] CASSANDRA_HOSTS = {"localhost"};
private static String username = "cassandra";
private static String password = "cassandra";
private static int binaryPort;
private static Cluster cluster;
private static Session session;
private static final String TEST_KEYSPACE = "cqlmigrate_clusterhealth_test";

private ClusterHealth mockClusterHealth = mock(ClusterHealth.class);
private final SessionContextFactory sessionContextFactory = new SessionContextFactory() {
@Override
SessionContext getInstance(Session session, CqlMigratorConfig cqlMigratorConfig) {
return new SessionContext(session, cqlMigratorConfig.getReadConsistencyLevel(), cqlMigratorConfig.getWriteConsistencyLevel(), mockClusterHealth);
}
};

private final CqlMigratorImpl migrator = new CqlMigratorImpl(CqlMigratorConfig.builder()
.withCassandraLockConfig(CassandraLockConfig.builder().build())
.withReadConsistencyLevel(ConsistencyLevel.ALL)
.withWriteConsistencyLevel(ConsistencyLevel.ALL)
.build(), sessionContextFactory);

@BeforeClass
public static void setupCassandra() throws ConfigurationException, IOException, TTransportException, InterruptedException {
EmbeddedCassandraServerHelper.startEmbeddedCassandra(EmbeddedCassandraServerHelper.CASSANDRA_RNDPORT_YML_FILE);
binaryPort = EmbeddedCassandraServerHelper.getNativeTransportPort();

cluster = Cluster.builder()
.addContactPoints(CASSANDRA_HOSTS)
.withPort(binaryPort)
.withCredentials(username, password)
.build();
session = cluster.connect();
}

@Before
public void setUp() throws Exception {
session.execute("DROP KEYSPACE IF EXISTS cqlmigrate_clusterhealth_test");
session.execute("CREATE KEYSPACE IF NOT EXISTS cqlmigrate WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };");
session.execute("CREATE TABLE IF NOT EXISTS cqlmigrate.locks (name text PRIMARY KEY, client text)");
}

@After
public void tearDown() {
session.execute("TRUNCATE cqlmigrate.locks");
System.clearProperty("hosts");
System.clearProperty("keyspace");
System.clearProperty("directories");
}

@AfterClass
public static void tearDownClass() throws Exception {
session.execute("DROP KEYSPACE cqlmigrate");
cluster.close();
EmbeddedCassandraServerHelper.cleanEmbeddedCassandra();
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
}

private Path getResourcePath(String resourcePath) throws URISyntaxException {
return Paths.get(ClassLoader.getSystemResource(resourcePath).toURI());
}

@Test
public void checkClusterHealthCheckedWhenBootstrappingCassandra() throws Exception {
//given
Collection<Path> cqlPaths = singletonList(getResourcePath("cql_cluster_health_bootstrap"));
//when
migrator.migrate(CASSANDRA_HOSTS, binaryPort, username, password, TEST_KEYSPACE, cqlPaths);

//then
verify(mockClusterHealth, times(1)).check();
}

@Test
public void checkClusterHealthCheckedWhenCreatingSchemaMigrateTable() throws Exception {
//given
session.execute("CREATE KEYSPACE cqlmigrate_clusterhealth_test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };");
Collection<Path> cqlPaths = singletonList(getResourcePath("cql_cluster_health_bootstrap"));
//when
migrator.migrate(CASSANDRA_HOSTS, binaryPort, username, password, TEST_KEYSPACE, cqlPaths);

//then
verify(mockClusterHealth, times(1)).check();
}

@Test
public void checkClusterHealthCheckedWhenThereAreChanges() throws Exception {
//given
Collection<Path> cqlPaths = new ArrayList<>();
cqlPaths.add(getResourcePath("cql_cluster_health_bootstrap"));
migrator.migrate(CASSANDRA_HOSTS, binaryPort, username, password, TEST_KEYSPACE, cqlPaths);
cqlPaths.add(getResourcePath("cql_cluster_health_first"));
Mockito.reset(mockClusterHealth);
//when
migrator.migrate(CASSANDRA_HOSTS, binaryPort, username, password, TEST_KEYSPACE, cqlPaths);
//then
verify(mockClusterHealth, times(1)).check();
}

@Test
public void checkClusterHealthCheckedWhenThereAreFollowingChanges() throws Exception {
//given
Collection<Path> cqlPaths = Lists.newArrayList(getResourcePath("cql_cluster_health_bootstrap"), getResourcePath("cql_cluster_health_first"));
migrator.migrate(CASSANDRA_HOSTS, binaryPort, username, password, TEST_KEYSPACE, cqlPaths);
Mockito.reset(mockClusterHealth);
cqlPaths.add(getResourcePath("cql_cluster_health_second"));
//when
migrator.migrate(CASSANDRA_HOSTS, binaryPort, username, password, TEST_KEYSPACE, cqlPaths);
//then
verify(mockClusterHealth, times(1)).check();
}

@Test
public void checkClusterHealthNotCheckedWhenThereAreNoChanges() throws Exception {
Collection<Path> cqlPaths = Lists.newArrayList(getResourcePath("cql_cluster_health_bootstrap"), getResourcePath("cql_cluster_health_first"));
migrator.migrate(CASSANDRA_HOSTS, binaryPort, username, password, TEST_KEYSPACE, cqlPaths);
Mockito.reset(mockClusterHealth);
//when
migrator.migrate(CASSANDRA_HOSTS, binaryPort, username, password, TEST_KEYSPACE, cqlPaths);
//then
verify(mockClusterHealth, never()).check();
}
}
4 changes: 2 additions & 2 deletions src/test/java/uk/sky/cqlmigrate/CqlMigratorImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class CqlMigratorImplTest {
.withCassandraLockConfig(CassandraLockConfig.builder().build())
.withReadConsistencyLevel(ConsistencyLevel.ALL)
.withWriteConsistencyLevel(ConsistencyLevel.ALL)
.build());
.build(), new SessionContextFactory());

private ExecutorService executorService;

Expand Down Expand Up @@ -102,7 +102,7 @@ public void shouldThrowCannotAcquireLockExceptionIfLockCannotBeAcquiredAfterTime
.withCassandraLockConfig(CassandraLockConfig.builder().withPollingInterval(ofMillis(50)).withTimeout(ofMillis(300)).build())
.withReadConsistencyLevel(ConsistencyLevel.ALL)
.withWriteConsistencyLevel(ConsistencyLevel.ALL)
.build());
.build(), new SessionContextFactory());

String client = UUID.randomUUID().toString();
session.execute("INSERT INTO cqlmigrate.locks (name, client) VALUES (?, ?)", LOCK_NAME, client);
Expand Down
11 changes: 7 additions & 4 deletions src/test/java/uk/sky/cqlmigrate/SchemaUpdatesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,24 @@
public class SchemaUpdatesTest {

private static final String[] CASSANDRA_HOSTS = {"localhost"};
private static final String TEST_KEYSPACE = "cqlmigrate_test";
private static final String SCHEMA_UPDATES_TABLE = "schema_updates";

private static int binaryPort;
private static String username = "cassandra";
private static String password = "cassandra";
private static Cluster cluster;
private static Session session;
private static final String TEST_KEYSPACE = "cqlmigrate_test";
private static ClusterHealth clusterHealth;

private static final String SCHEMA_UPDATES_TABLE = "schema_updates";

@BeforeClass
public static void setupCassandra() throws ConfigurationException, IOException, TTransportException, InterruptedException {
EmbeddedCassandraServerHelper.startEmbeddedCassandra(EmbeddedCassandraServerHelper.CASSANDRA_RNDPORT_YML_FILE);
binaryPort = EmbeddedCassandraServerHelper.getNativeTransportPort();

cluster = Cluster.builder().addContactPoints(CASSANDRA_HOSTS).withPort(binaryPort).withCredentials(username, password).build();
clusterHealth = new ClusterHealth(cluster);
session = cluster.connect();
}

Expand Down Expand Up @@ -62,7 +65,7 @@ public void schemaUpdatesTableShouldBeCreatedIfNotExists() throws Exception {
//given
cluster.connect("system").execute("CREATE KEYSPACE " + TEST_KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };");
Session session = cluster.connect(TEST_KEYSPACE);
SessionContext sessionContext = new SessionContext(session, ConsistencyLevel.ALL, ConsistencyLevel.ALL);
SessionContext sessionContext = new SessionContext(session, ConsistencyLevel.ALL, ConsistencyLevel.ALL, clusterHealth);
SchemaUpdates schemaUpdates = new SchemaUpdates(sessionContext, TEST_KEYSPACE);

//when
Expand All @@ -78,7 +81,7 @@ public void schemaUpdatesTableShouldNotBeCreatedIfExists() throws Exception {
//given
cluster.connect("system").execute("CREATE KEYSPACE " + TEST_KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };");
Session session = cluster.connect(TEST_KEYSPACE);
SessionContext sessionContext = new SessionContext(session, ConsistencyLevel.ALL, ConsistencyLevel.ALL);
SessionContext sessionContext = new SessionContext(session, ConsistencyLevel.ALL, ConsistencyLevel.ALL, clusterHealth);
SchemaUpdates schemaUpdates = new SchemaUpdates(sessionContext, TEST_KEYSPACE);

//when
Expand Down
Loading

0 comments on commit a192a7a

Please sign in to comment.