Skip to content

Commit

Permalink
Rename MySQLBaseBinlogEvent (apache#32563)
Browse files Browse the repository at this point in the history
* Rename MySQLBaseBinlogEvent

* Refactor MySQLWriteRowsBinlogEvent

* Refactor MySQLUpdateRowsBinlogEvent

* Refactor MySQLDeleteRowsBinlogEvent
  • Loading branch information
terrymanu authored Aug 16, 2024
1 parent 59455de commit f0f1c2a
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import lombok.Setter;

/**
* Abstract binlog event.
* MySQL base binlog event.
*/
@Getter
@Setter
public abstract class AbstractBinlogEvent {
public abstract class MySQLBaseBinlogEvent {

private String fileName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@
/**
* Placeholder binlog event, unsupported binlog event will replace it into this class.
*/
public final class PlaceholderEvent extends AbstractBinlogEvent {
public final class PlaceholderBinlogEvent extends MySQLBaseBinlogEvent {
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.query;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;

/**
* Query event.This event is written into the binary log file for:
Expand All @@ -30,7 +31,7 @@
*/
@RequiredArgsConstructor
@Getter
public final class QueryEvent extends AbstractBinlogEvent {
public final class MySQLQueryBinlogEvent extends MySQLBaseBinlogEvent {

private final long threadId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;

import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;

/**
* Abstract rows event.
* MySQL rows base event.
*/
@Getter
@Setter
public abstract class AbstractRowsEvent extends AbstractBinlogEvent {
public abstract class MySQLBaseRowsBinlogEvent extends MySQLBaseBinlogEvent {

private String databaseName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;

import lombok.Getter;
import lombok.Setter;
import lombok.RequiredArgsConstructor;

import java.io.Serializable;
import java.util.List;

/**
* Delete rows event.
* MySQL delete rows binlog event.
*/
@RequiredArgsConstructor
@Getter
@Setter
public final class DeleteRowsEvent extends AbstractRowsEvent {
public final class MySQLDeleteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent {

private List<Serializable[]> beforeRows;
private final List<Serializable[]> beforeRows;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,22 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;

import lombok.Getter;
import lombok.Setter;
import lombok.RequiredArgsConstructor;

import java.io.Serializable;
import java.util.List;

/**
* Update rows event.
* MySQL update rows binlog event.
*/
@RequiredArgsConstructor
@Getter
@Setter
public final class UpdateRowsEvent extends AbstractRowsEvent {
public final class MySQLUpdateRowsBinlogEvent extends MySQLBaseRowsBinlogEvent {

private List<Serializable[]> beforeRows;
private final List<Serializable[]> beforeRows;

private List<Serializable[]> afterRows;
private final List<Serializable[]> afterRows;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;

import lombok.Getter;
import lombok.Setter;
import lombok.RequiredArgsConstructor;

import java.io.Serializable;
import java.util.List;

/**
* Write rows event.
* MySQL write rows binlog event.
*/
@RequiredArgsConstructor
@Getter
@Setter
public final class WriteRowsEvent extends AbstractRowsEvent {
public final class MySQLWriteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent {

private List<Serializable[]> afterRows;
private final List<Serializable[]> afterRows;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.transaction;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;

/**
* XID event is generated for a COMMIT of a transaction that modifies one or more tables of an XA-capable storage engine.
Expand All @@ -27,7 +28,7 @@
*/
@RequiredArgsConstructor
@Getter
public final class XidEvent extends AbstractBinlogEvent {
public final class MySQLXidBinlogEvent extends MySQLBaseBinlogEvent {

private final long xid;
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.AbstractBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.netty.MySQLBinlogEventPacketDecoder;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.netty.MySQLCommandPacketDecoder;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.netty.MySQLNegotiateHandler;
Expand Down Expand Up @@ -77,7 +77,7 @@ public final class MySQLBinlogClient {

private final boolean decodeWithTX;

private final ArrayBlockingQueue<List<AbstractBinlogEvent>> blockingEventQueue = new ArrayBlockingQueue<>(2500);
private final ArrayBlockingQueue<List<MySQLBaseBinlogEvent>> blockingEventQueue = new ArrayBlockingQueue<>(2500);

private EventLoopGroup eventLoopGroup;

Expand Down Expand Up @@ -226,8 +226,8 @@ private void dumpBinlog(final String binlogFileName, final long binlogPosition,
channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int) binlogPosition, connectInfo.getServerId(), binlogFileName));
}

private AbstractBinlogEvent getLastBinlogEvent(final String binlogFileName, final long binlogPosition) {
PlaceholderEvent result = new PlaceholderEvent();
private MySQLBaseBinlogEvent getLastBinlogEvent(final String binlogFileName, final long binlogPosition) {
PlaceholderBinlogEvent result = new PlaceholderBinlogEvent();
result.setFileName(binlogFileName);
result.setPosition(binlogPosition);
return result;
Expand All @@ -242,12 +242,12 @@ private void resetSequenceID() {
*
* @return binlog event
*/
public synchronized List<AbstractBinlogEvent> poll() {
public synchronized List<MySQLBaseBinlogEvent> poll() {
if (!running) {
return Collections.emptyList();
}
try {
List<AbstractBinlogEvent> result = blockingEventQueue.poll(100L, TimeUnit.MILLISECONDS);
List<MySQLBaseBinlogEvent> result = blockingEventQueue.poll(100L, TimeUnit.MILLISECONDS);
return null == result ? Collections.emptyList() : result;
} catch (final InterruptedException ignored) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -314,11 +314,11 @@ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cau

private final class MySQLBinlogEventHandler extends ChannelInboundHandlerAdapter {

private final AtomicReference<AbstractBinlogEvent> lastBinlogEvent;
private final AtomicReference<MySQLBaseBinlogEvent> lastBinlogEvent;

private final AtomicBoolean reconnectRequested = new AtomicBoolean(false);

MySQLBinlogEventHandler(final AbstractBinlogEvent lastBinlogEvent) {
MySQLBinlogEventHandler(final MySQLBaseBinlogEvent lastBinlogEvent) {
this.lastBinlogEvent = new AtomicReference<>(lastBinlogEvent);
}

Expand All @@ -329,7 +329,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throw
return;
}
if (msg instanceof List) {
List<AbstractBinlogEvent> records = (List<AbstractBinlogEvent>) msg;
List<MySQLBaseBinlogEvent> records = (List<MySQLBaseBinlogEvent>) msg;
if (records.isEmpty()) {
log.warn("The records is empty");
return;
Expand All @@ -338,8 +338,8 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throw
blockingEventQueue.put(records);
return;
}
if (msg instanceof AbstractBinlogEvent) {
lastBinlogEvent.set((AbstractBinlogEvent) msg);
if (msg instanceof MySQLBaseBinlogEvent) {
lastBinlogEvent.set((MySQLBaseBinlogEvent) msg);
blockingEventQueue.put(Collections.singletonList(lastBinlogEvent.get()));
}
}
Expand Down
Loading

0 comments on commit f0f1c2a

Please sign in to comment.