Skip to content

Commit

Permalink
support retries with smaller batch sizes when receiving an InvalidQue…
Browse files Browse the repository at this point in the history
…ryException while executing a BatchStatement

work in progress.
  • Loading branch information
jwijgerd committed Jan 31, 2024
1 parent 2de8e4a commit 36147d0
Show file tree
Hide file tree
Showing 9 changed files with 440 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2013 - 2023 The Original Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.elasticsoftware.elasticactors.cassandra2.state;

import com.datastax.driver.core.BatchStatement;

public class BatchTooLargeException extends RuntimeException {
private final BatchStatement originalBatch;
private final int batchSize;

public BatchTooLargeException(BatchStatement originalBatch, int batchSize) {
super("BatchStatement of size "+batchSize+" too large to execute");
this.originalBatch = originalBatch;
this.batchSize = batchSize;
}

public BatchStatement getOriginalBatch() {
return originalBatch;
}

public int getBatchSize() {
return batchSize;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public final class PersistentActorUpdateEventProcessor implements ThreadBoundEve
private final PreparedStatement deleteStatement;
private final Map<Integer,PreparedStatement> batchStatements = new HashMap<>();
private final boolean optimizedV1Batches;
private final ProtocolVersion protocolVersion;

public PersistentActorUpdateEventProcessor(Session cassandraSession, int maxBatchSize) {
this(cassandraSession, maxBatchSize, true);
Expand All @@ -62,6 +63,7 @@ public PersistentActorUpdateEventProcessor(Session cassandraSession, int maxBatc
prepareBatchIfNeeded(maxBatchSize);
}
this.optimizedV1Batches = optimizedV1Batches;
this.protocolVersion = cassandraSession.getCluster().getConfiguration().getProtocolOptions().getProtocolVersion();
}

/**
Expand Down Expand Up @@ -96,30 +98,7 @@ public void process(List<PersistentActorUpdateEvent> events) {
final long startTime = logger.isTraceEnabled() ? System.nanoTime() : 0L;
try {
// optimized to use the prepared statement
if(events.size() == 1) {
PersistentActorUpdateEvent event = events.get(0);
BoundStatement boundStatement;
if (event.hasPersistentActorBytes()) {
boundStatement = insertStatement.bind(event.rowKey()[0], event.rowKey()[1], event.persistentActorId(), event.persistentActorBytes());
} else {
// it's a delete
boundStatement = deleteStatement.bind(event.rowKey()[0], event.rowKey()[1], event.persistentActorId());
}
// execute the statement
executeWithRetry(cassandraSession, boundStatement, logger);
} else {
// check the protocol to see if BatchStatements are supported
ProtocolVersion protocolVersion = cassandraSession.getCluster().getConfiguration().getProtocolOptions().getProtocolVersion();
if(ProtocolVersion.V1.equals(protocolVersion)) {
if(this.optimizedV1Batches) {
executeBatchV1Optimized(events);
} else {
executeBatchV1(events);
}
} else {
executeBatchV2AndUp(events);
}
}
processEvents(events);
} catch(Exception e) {
executionException = e;
} finally {
Expand All @@ -144,6 +123,32 @@ public void process(List<PersistentActorUpdateEvent> events) {
}
}

private void processEvents(List<PersistentActorUpdateEvent> events) {
if(events.size() == 1) {
PersistentActorUpdateEvent event = events.get(0);
BoundStatement boundStatement;
if (event.hasPersistentActorBytes()) {
boundStatement = insertStatement.bind(event.rowKey()[0], event.rowKey()[1], event.persistentActorId(), event.persistentActorBytes());
} else {
// it's a delete
boundStatement = deleteStatement.bind(event.rowKey()[0], event.rowKey()[1], event.persistentActorId());
}
// execute the statement
executeWithRetry(cassandraSession, boundStatement, logger);
} else {
// check the protocol to see if BatchStatements are supported
if(ProtocolVersion.V1.equals(protocolVersion)) {
if(this.optimizedV1Batches) {
executeBatchV1Optimized(events);
} else {
executeBatchV1(events);
}
} else {
executeBatchV2AndUp(events);
}
}
}

private void executeBatchV1(List<PersistentActorUpdateEvent> events) {
List<Object> arguments = new LinkedList<>();
StringBuilder batchBuilder = new StringBuilder("BEGIN UNLOGGED BATCH ");
Expand All @@ -170,7 +175,19 @@ private void executeBatchV1(List<PersistentActorUpdateEvent> events) {
batchBuilder.append("APPLY BATCH");
// @todo: this causes a warning, but doing it without seems to fail with binary values!
PreparedStatement batchStatement = cassandraSession.prepare(batchBuilder.toString());
executeWithRetry(cassandraSession, batchStatement.bind(arguments.toArray()), logger);
try {
executeWithRetry(cassandraSession, batchStatement.bind(arguments.toArray()), logger);
} catch(BatchTooLargeException e) {
int half = events.size() / 2;
// batch is too large, so we need to split it up
logger.warn(
"Batch of byteSize {} is too large, splitting up in 2 batches. 1 of {} events and 1 of {} events",
e.getBatchSize(),
half,
events.size() - half);
processEvents(events.subList(0, half));
processEvents(events.subList(half, events.size()));
}
}

private void executeBatchV1Optimized(List<PersistentActorUpdateEvent> events) {
Expand All @@ -195,7 +212,19 @@ private void executeBatchV1Optimized(List<PersistentActorUpdateEvent> events) {
batchStatement = batchStatements.get(batchSize);
}
if(batchStatement != null) {
executeWithRetry(cassandraSession, batchStatement.bind(arguments.toArray()), logger);
try {
executeWithRetry(cassandraSession, batchStatement.bind(arguments.toArray()), logger);
} catch(BatchTooLargeException e) {
int half = events.size() / 2;
// batch is too large, so we need to split it up
logger.warn(
"Batch of byteSize {} is too large, splitting up in 2 batches. 1 of {} events and 1 of {} events",
e.getBatchSize(),
half,
events.size() - half);
processEvents(events.subList(0, half));
processEvents(events.subList(half, events.size()));
}
} else {
// fallback to non-optimized version
executeBatchV1(events);
Expand All @@ -212,7 +241,19 @@ private void executeBatchV2AndUp(List<PersistentActorUpdateEvent> events) {
batchStatement.add(deleteStatement.bind(event.rowKey()[0], event.rowKey()[1], event.persistentActorId()));
}
}
executeWithRetry(cassandraSession, batchStatement, logger);
try {
executeWithRetry(cassandraSession, batchStatement, logger);
} catch(BatchTooLargeException e) {
int half = events.size() / 2;
// batch is too large, so we need to split it up
logger.warn(
"Batch of byteSize {} is too large, splitting up in 2 batches. 1 of {} events and 1 of {} events",
e.getBatchSize(),
half,
events.size() - half);
processEvents(events.subList(0, half));
processEvents(events.subList(half, events.size()));
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,13 @@

package org.elasticsoftware.elasticactors.cassandra2.util;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.BootstrappingException;
import com.datastax.driver.core.exceptions.ConnectionException;
import com.datastax.driver.core.exceptions.CoordinatorException;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.exceptions.OverloadedException;
import com.datastax.driver.core.exceptions.QueryConsistencyException;
import com.datastax.driver.core.exceptions.UnavailableException;
import com.datastax.driver.core.*;
import com.datastax.driver.core.exceptions.*;
import org.elasticsoftware.elasticactors.cassandra2.state.BatchTooLargeException;
import org.slf4j.Logger;

import java.net.InetAddress;
import java.util.Optional;

/**
* @author Joost van de Wijgerd
Expand All @@ -44,13 +38,29 @@ public static ResultSet executeWithRetry(Session cassandraSession, Statement sta
try {
return cassandraSession.execute(statement);
} catch (ConnectionException | OverloadedException | QueryConsistencyException | BootstrappingException e) {
logger.warn("{} on node {} while executing statement, retry attempt {}", e.getClass().getSimpleName(), e.getHost(), attempts);
logger.warn("{} on node {} while executing statement, retry attempt {}", e.getClass().getSimpleName(), e.getEndPoint(), attempts);
latestException = e;
} catch(UnavailableException e) {
logger.error("node {} is reporting not enough replicas available, will not retry", e.getHost());
logger.error("node {} is reporting not enough replicas available, will not retry",
Optional.ofNullable(e.getEndPoint()).map(endPoint -> endPoint.resolve().toString()).orElse("UNKNOWN"));
throw e;
} catch(InvalidQueryException e) {
logger.error("InvalidQueryException with message {} on node {} while executing statement, will retry in case of BatchStatement",
e.getMessage(),
Optional.of(e.getEndPoint()).map(endPoint -> endPoint.resolve().toString()).orElse("UNKNOWN"));
if(statement instanceof BatchStatement batch) {
Configuration cassandraConfiguration = cassandraSession.getCluster().getConfiguration();
throw new BatchTooLargeException(batch,batch.requestSizeInBytes(
cassandraConfiguration.getProtocolOptions().getProtocolVersion(),
cassandraConfiguration.getCodecRegistry() ));
} else {
throw e;
}
} catch(RuntimeException e) {
InetAddress node = (e instanceof CoordinatorException) ? ((CoordinatorException)e).getHost() : null;
InetAddress node = (e instanceof CoordinatorException) ?
Optional.ofNullable(((CoordinatorException)e).getEndPoint())
.map(endPoint -> endPoint.resolve().getAddress()).orElse(null)
: null;
logger.error("{} on node {} while executing statement, will not retry", e.getClass().getSimpleName(), node);
throw e;
}
Expand Down
15 changes: 15 additions & 0 deletions main/backplane-cassandra4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,21 @@
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2013 - 2023 The Original Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.elasticsoftware.elasticactors.cassandra4.state;

import com.datastax.oss.driver.api.core.cql.BatchStatement;

public class BatchTooLargeException extends RuntimeException {
private final BatchStatement originalBatch;
private final int batchSize;

public BatchTooLargeException(BatchStatement originalBatch, int batchSize) {
super("BatchStatement of size "+batchSize+" too large to execute");
this.originalBatch = originalBatch;
this.batchSize = batchSize;
}

public BatchStatement getOriginalBatch() {
return originalBatch;
}

public int getBatchSize() {
return batchSize;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,7 @@ public void process(List<PersistentActorUpdateEvent> events) {
Exception executionException = null;
final long startTime = logger.isTraceEnabled() ? System.nanoTime() : 0L;
try {
// optimized to use the prepared statement
if(events.size() == 1) {
PersistentActorUpdateEvent event = events.get(0);
BoundStatement boundStatement;
if (event.hasPersistentActorBytes()) {
boundStatement = insertStatement.bind(event.rowKey()[0], event.rowKey()[1], event.persistentActorId(), event.persistentActorBytes());
} else {
// it's a delete
boundStatement = deleteStatement.bind(event.rowKey()[0], event.rowKey()[1], event.persistentActorId());
}
// execute the statement
executeWithRetry(cassandraSession, boundStatement, logger);
} else {
executeBatchV3AndUp(events);
}
processEvents(events);
} catch(Exception e) {
executionException = e;
} finally {
Expand All @@ -99,6 +85,24 @@ public void process(List<PersistentActorUpdateEvent> events) {
}
}

private void processEvents(List<PersistentActorUpdateEvent> events) {
// optimized to use the prepared statement
if(events.size() == 1) {
PersistentActorUpdateEvent event = events.get(0);
BoundStatement boundStatement;
if (event.hasPersistentActorBytes()) {
boundStatement = insertStatement.bind(event.rowKey()[0], event.rowKey()[1], event.persistentActorId(), event.persistentActorBytes());
} else {
// it's a delete
boundStatement = deleteStatement.bind(event.rowKey()[0], event.rowKey()[1], event.persistentActorId());
}
// execute the statement
executeWithRetry(cassandraSession, boundStatement, logger);
} else if(!events.isEmpty()) {
executeBatchV3AndUp(events);
}
}

private void executeBatchV3AndUp(List<PersistentActorUpdateEvent> events) {
BatchStatement batchStatement = BatchStatement.newInstance(UNLOGGED);
for (PersistentActorUpdateEvent event : events) {
Expand All @@ -109,9 +113,18 @@ private void executeBatchV3AndUp(List<PersistentActorUpdateEvent> events) {
batchStatement.add(deleteStatement.bind(event.rowKey()[0], event.rowKey()[1], event.persistentActorId()));
}
}
executeWithRetry(cassandraSession, batchStatement, logger);
try {
executeWithRetry(cassandraSession, batchStatement, logger);
} catch (BatchTooLargeException e) {
int half = events.size() / 2;
// batch is too large, so we need to split it up
logger.warn(
"Batch of byteSize {} is too large, splitting up in 2 batches. 1 of {} events and 1 of {} events",
e.getBatchSize(),
half,
events.size() - half);
processEvents(events.subList(0, half));
processEvents(events.subList(half, events.size()));
}
}



}
Loading

0 comments on commit 36147d0

Please sign in to comment.