Skip to content

Commit

Permalink
implement switchover from master branch
Browse files Browse the repository at this point in the history
  • Loading branch information
harshit-digicert committed Dec 11, 2024
1 parent c01f2c0 commit 871caa4
Showing 1 changed file with 243 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -36,6 +32,12 @@
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -138,6 +140,13 @@ public abstract class BinlogStreamingChangeEventSource<P extends BinlogPartition
@SingleThreadAccess("binlog client thread")
protected Instant eventTimestamp;

private final int MAX_RETRY_FOR_BINLOG_RESET = 2;
private int currentBinlogResetCount = 0;
private Long previousMasterId;
private Long currentMasterId;
private Producer<String, String> kafkaProducer;
private final String ERROR_TOPIC = "src_dcom_debezium_connector_error";

public BinlogStreamingChangeEventSource(BinlogConnectorConfig connectorConfig,
BinlogConnectorConnection connection,
EventDispatcher<P, TableId> dispatcher,
Expand All @@ -161,6 +170,36 @@ public BinlogStreamingChangeEventSource(BinlogConnectorConfig connectorConfig,
this.client = createBinaryLogClient(taskContext, connectorConfig, binaryLogClientThreads, connection);
this.gtidDmlSourceFilter = getGtidDmlSourceFilter();
this.isGtidModeEnabled = connection.isGtidModeEnabled();

try {
kafkaProducer = getKafkaProducer();
}
catch (Exception e) {
LOGGER.error("Error while creating kafka producer for error topic", e);
}
}

private Producer<String, String> getKafkaProducer() {
if (kafkaProducer != null)
return kafkaProducer;
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(getProperties());
checkConnectivity(kafkaProducer);
return kafkaProducer;
}

private void checkConnectivity(KafkaProducer<String, String> kafkaProducer) {
List<PartitionInfo> partitionInfos = kafkaProducer.partitionsFor(ERROR_TOPIC);
for (PartitionInfo info : partitionInfos) {
LOGGER.info("Kafka producer is connected to topic: {} and partition: {}", info.topic(), info.partition());
}
}

private Properties getProperties() {
Properties props = new Properties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, connectorConfig.getConfig().getString("schema.history.internal.kafka.bootstrap.servers"));
return props;
}

@Override
Expand Down Expand Up @@ -1078,6 +1117,10 @@ private <T extends EventData, U> void handleChange(P partition,
BinlogChangeEmitter<U> changeEmitter,
ChangeEventValidator<U> changeEventValidator)
throws InterruptedException {
if (currentBinlogResetCount > 0) {
currentBinlogResetCount = 0;
LOGGER.info("Resetting currentBinlogResetCount to {} after binlog offset is reset properly and data is flowing", currentBinlogResetCount);
}
if (skipEvent) {
// We can skip this because we should already be at least this far ...
LOGGER.info("Skipping previously processed row event: {}", event);
Expand Down Expand Up @@ -1245,6 +1288,11 @@ public void onCommunicationFailure(BinaryLogClient client, Exception ex) {
catch (final Exception e) {
LOGGER.debug("Exception while closing client", e);
}
LOGGER.error("Communication failure error : ", ex);
currentMasterId = getMasterServerId();
sendDatabaseSwitchMessage(client, ex.getMessage());
// errorHandler.setProducerThrowable(wrap(ex));
resetBinlogOffset(client, ex);
errorHandler.setProducerThrowable(wrap(ex));
}

Expand All @@ -1264,6 +1312,92 @@ else if (eventDeserializationFailureHandlingMode == EventProcessingFailureHandli
logStreamingSourceState(Level.DEBUG);
}
}

private void resetBinlogOffset(BinaryLogClient client, Exception ex) {
if (currentBinlogResetCount < MAX_RETRY_FOR_BINLOG_RESET) {
LOGGER.error("Trying to set new offset for the server");
final String showMasterStmt = "SHOW MASTER STATUS";
try {
connection.isValid();
connection.query(showMasterStmt, rs -> {
if (rs.next()) {
final String newBinlogFilename = rs.getString(1);
final long binlogPosition = rs.getLong(2);
client.setBinlogFilename(newBinlogFilename);
client.setBinlogPosition(binlogPosition);
offsetContext.setBinlogStartPoint(newBinlogFilename, binlogPosition);
LOGGER.error(" New offset : binlog '{}' at position '{}'", newBinlogFilename, binlogPosition);
currentBinlogResetCount++;
LOGGER.error("Retry count for binlog reset is {}", currentBinlogResetCount);
}
else {
LOGGER.error("Unexpected response to '{}': no results were returned", showMasterStmt);
errorHandler.setProducerThrowable(wrap(ex));
}
});
client.connect();
}
catch (Exception e) {
LOGGER.error("Error while resetting binlog offset", e);
errorHandler.setProducerThrowable(wrap(ex));
}
}
else {
LOGGER.error("Max retry for binlog reset reached");
errorHandler.setProducerThrowable(wrap(ex));
}
}

private Long getMasterServerId() {
previousMasterId = currentMasterId;
LOGGER.info("Previous master server id: {}", previousMasterId);
Long[] id = new Long[1];
String serverIdQuery = "SELECT @@server_id";
try {
connection.isValid();
connection.query(serverIdQuery, rs -> {
if (rs.next()) {
id[0] = rs.getLong(1);
LOGGER.info("Current master server id: {}", id[0]);
}
else {
LOGGER.info("Unexpected response to '{}': no results were returned", serverIdQuery);
}
});
}
catch (Exception e) {
LOGGER.error("Error while getting master server id", e);
return null;
}
return id[0];
}

private void sendDatabaseSwitchMessage(BinaryLogClient client, String error) {
try {
if (previousMasterId != null && currentMasterId != null && !previousMasterId.equals(currentMasterId)) {
DatabaseSwitchMessagae messagae = createDatabaseSwitchMessage(client, error);
Producer<String, String> kafkaProducer = getKafkaProducer();
kafkaProducer.send(new ProducerRecord<>(ERROR_TOPIC, messagae.toString()));
}
}
catch (Exception e) {
LOGGER.error("Error while sending error message to kafka topic ", e);
}
}

private DatabaseSwitchMessagae createDatabaseSwitchMessage(BinaryLogClient client, String error) {
DatabaseSwitchMessagae messagae = new DatabaseSwitchMessagae();
messagae.setConnectorName(connectorConfig.getConfig().getString("name"));
messagae.setTopicName(connectorConfig.getConfig().getString("topic.prefix"));
messagae.setTableName(connectorConfig.getConfig().getString("table.include.list"));
messagae.setHostName(connectorConfig.getHostName());
messagae.setPreviousMasterId(previousMasterId);
messagae.setNewMasterId(currentMasterId);
messagae.setLastOffset(client.getBinlogFilename() + "/" + client.getBinlogPosition());
messagae.setError(error);
messagae.setLogDate(Instant.now().toString());
return messagae;
}
}

private SSLMode sslModeFor(SecureConnectionMode mode) {
Expand Down Expand Up @@ -1433,4 +1567,108 @@ public boolean equals(Object obj) {
return true;
}
}

public class DatabaseSwitchMessagae {
private String connectorName;
private String topicName;
private String tableName;
private String hostName;
private Long previousMasterId;
private Long newMasterId;
private String lastOffset;
private String error;
private String logDate;

// Getter methods
public String getConnectorName() {
return connectorName;
}

public String getTopicName() {
return topicName;
}

public String getTableName() {
return tableName;
}

public Long getPreviousMasterId() {
return previousMasterId;
}

public Long getNewMasterId() {
return newMasterId;
}

public String getLogDate() {
return logDate;
}

public String getHostName() {
return hostName;
}

public String getLastOffset() {
return lastOffset;
}

public String getError() {
return error;
}

// Setter methods
public void setConnectorName(String connectorName) {
this.connectorName = connectorName;
}

public void setTopicName(String topicName) {
this.topicName = topicName;
}

public void setTableName(String tableName) {
this.tableName = tableName;
}

public void setPreviousMasterId(Long previousMasterId) {
this.previousMasterId = previousMasterId;
}

public void setNewMasterId(Long newMasterId) {
this.newMasterId = newMasterId;
}

public void setLogDate(String logDate) {
this.logDate = logDate;
}

public void setHostName(String hostName) {
this.hostName = hostName;
}

public void setLastOffset(String lastOffset) {
this.lastOffset = lastOffset;
}

public void setError(String error) {
this.error = error;
}

// toString method
@Override
public String toString() {
StringBuilder jsonString = new StringBuilder();
jsonString.append("{\n");
jsonString.append(" \"connector_Name\": \"").append(connectorName).append("\",\n");
jsonString.append(" \"topic_name\": \"").append(topicName).append("\",\n");
jsonString.append(" \"table_name\": \"").append(tableName).append("\",\n");
jsonString.append(" \"host_name\": \"").append(hostName).append("\",\n");
jsonString.append(" \"previous_master_id\": \"").append(previousMasterId).append("\",\n");
jsonString.append(" \"new_master_id\": \"").append(newMasterId).append("\",\n");
jsonString.append(" \"last_offset\": \"").append(lastOffset).append("\",\n");
jsonString.append(" \"error_message\": \"").append(error).append("\",\n");
jsonString.append(" \"log_date\": \"").append(logDate).append("\"\n");
jsonString.append("}");
return jsonString.toString();
}
}
}

0 comments on commit 871caa4

Please sign in to comment.