diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/Psync.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/Psync.java index d16f21fd1f..ff4b5b5e3e 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/Psync.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/Psync.java @@ -14,6 +14,7 @@ public interface Psync extends Command, Closeable { String FULL_SYNC = "FULLRESYNC"; String PARTIAL_SYNC = "CONTINUE"; long KEEPER_PARTIAL_SYNC_OFFSET = -2; + long KEEPER_FRESH_RDB_SYNC_OFFSET = -3; void addPsyncObserver(PsyncObserver observer); diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/FreshRdbOnlyPsync.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/FreshRdbOnlyPsync.java new file mode 100644 index 0000000000..0e502e9371 --- /dev/null +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/FreshRdbOnlyPsync.java @@ -0,0 +1,22 @@ +package com.ctrip.xpipe.redis.core.protocal.cmd; + +import com.ctrip.xpipe.api.pool.SimpleObjectPool; +import com.ctrip.xpipe.netty.commands.NettyClient; +import com.ctrip.xpipe.redis.core.store.ReplicationStore; +import com.ctrip.xpipe.tuple.Pair; + +import java.util.concurrent.ScheduledExecutorService; + +public class FreshRdbOnlyPsync extends RdbOnlyPsync { + + public FreshRdbOnlyPsync(SimpleObjectPool clientPool, ReplicationStore store, ScheduledExecutorService scheduled) { + super(clientPool, store, scheduled); + } + + @Override + protected Pair getRequestMasterInfo() { + // psync ? -3 + return new Pair<>("?", KEEPER_FRESH_RDB_SYNC_OFFSET); + } + +} diff --git a/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/protocal/cmd/FreshRdbOnlyPsyncTest.java b/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/protocal/cmd/FreshRdbOnlyPsyncTest.java new file mode 100644 index 0000000000..3213090335 --- /dev/null +++ b/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/protocal/cmd/FreshRdbOnlyPsyncTest.java @@ -0,0 +1,108 @@ +package com.ctrip.xpipe.redis.core.protocal.cmd; + +import com.ctrip.xpipe.api.command.CommandFuture; +import com.ctrip.xpipe.api.command.CommandFutureListener; +import com.ctrip.xpipe.api.endpoint.Endpoint; +import com.ctrip.xpipe.endpoint.DefaultEndPoint; +import com.ctrip.xpipe.netty.NettyPoolUtil; +import com.ctrip.xpipe.redis.core.AbstractRedisTest; +import com.ctrip.xpipe.redis.core.protocal.PsyncObserver; +import com.ctrip.xpipe.redis.core.protocal.protocal.EofType; +import com.ctrip.xpipe.redis.core.redis.RunidGenerator; +import com.ctrip.xpipe.redis.core.store.MetaStore; +import com.ctrip.xpipe.redis.core.store.RdbStore; +import com.ctrip.xpipe.redis.core.store.ReplicationStore; +import com.ctrip.xpipe.redis.core.store.ReplicationStoreManager; +import com.ctrip.xpipe.simpleserver.Server; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +import static org.mockito.Mockito.*; + +@RunWith(MockitoJUnitRunner.Silent.class) +public class FreshRdbOnlyPsyncTest extends AbstractRedisTest { + + @Mock + private ReplicationStore replicationStore; + + @Mock + private MetaStore metaStore; + + @Before + public void beforeDefaultPsyncTest() throws Exception{ + when(replicationStore.getMetaStore()).thenReturn(metaStore); + when(replicationStore.getEndOffset()).thenReturn(-1L); + + } + + @Test + public void testFreshRdbOnlyPsync() throws Exception { + String replId = RunidGenerator.DEFAULT.generateRunid(); + int offset = 100; + Server redisServer = startServer(randomPort(), new Function() { + @Override + public String apply(String s) { + logger.info("[testFreshRdbOnlyPsync] {}", s); + if (s.trim().equals("psync ? -3")) { + return String.format("+FULLRESYNC %s %d\r\n", replId, offset); + } else { + return "+OK\r\n"; + } + } + }); + Endpoint redisEndpoint = new DefaultEndPoint("127.0.0.1", redisServer.getPort()); + FreshRdbOnlyPsync psync = new FreshRdbOnlyPsync(NettyPoolUtil.createNettyPool(redisEndpoint), replicationStore, scheduled); + + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger masetrOffset = new AtomicInteger(0); + psync.addPsyncObserver(new PsyncObserver() { + @Override + public void onFullSync(long masterRdbOffset) { + masetrOffset.set((int)masterRdbOffset); + latch.countDown(); + } + @Override + public void reFullSync() { + } + @Override + public void beginWriteRdb(EofType eofType, String replId, long masterRdbOffset) throws IOException { + } + @Override + public void endWriteRdb() { + } + @Override + public void onContinue(String requestReplId, String responseReplId) { + } + @Override + public void onKeeperContinue(String replId, long beginOffset) { + } + @Override + public void readAuxEnd(RdbStore rdbStore, Map auxMap) { + } + }); + psync.execute().addListener(new CommandFutureListener() { + + @Override + public void operationComplete(CommandFuture commandFuture) throws Exception { + if(!commandFuture.isSuccess()){ + logger.error("[operationComplete]", commandFuture.cause()); + } + } + }); + + latch.await(1000, TimeUnit.SECONDS); + Assert.assertEquals(offset, masetrOffset.get()); + } + +} diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisKeeperServer.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisKeeperServer.java index b5dd227114..ee870277f5 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisKeeperServer.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisKeeperServer.java @@ -70,7 +70,11 @@ public static enum PROMOTION_STATE{ } - void fullSyncToSlave(RedisSlave redisSlave) throws IOException; + default void fullSyncToSlave(RedisSlave redisSlave) throws IOException { + fullSyncToSlave(redisSlave, false); + } + + void fullSyncToSlave(RedisSlave redisSlave, boolean freshRdbNeeded) throws IOException; void startIndexing() throws IOException; diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisMaster.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisMaster.java index 5e6e289659..a3cd96bd33 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisMaster.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisMaster.java @@ -29,7 +29,7 @@ public interface RedisMaster extends RedisRole, Lifecycle, LifecycleStateAware, void reconnect(); - RdbDumper createRdbDumper(boolean tryRrodb) throws CreateRdbDumperException; + RdbDumper createRdbDumper(boolean tryRrodb, boolean freshRdbNeeded) throws CreateRdbDumperException; MASTER_STATE getMasterState(); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisSlave.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisSlave.java index e6d592bdad..7b646a4903 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisSlave.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisSlave.java @@ -46,7 +46,7 @@ public interface RedisSlave extends RedisClient, PartialAware void markPsyncProcessed(); /** - * If psync ? -1, slave start with no data, we should fsync immediately + * If psync ? -1 or psync xxxx 1, slave start with no data, we should fsync immediately */ void markColdStart(); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/AbstractSyncCommandHandler.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/AbstractSyncCommandHandler.java index f04c3922cc..aebf96853c 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/AbstractSyncCommandHandler.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/AbstractSyncCommandHandler.java @@ -70,6 +70,10 @@ public void run() { protected abstract void innerDoHandle(final String[] args, final RedisSlave redisSlave, RedisKeeperServer redisKeeperServer) throws IOException; protected void doFullSync(RedisSlave redisSlave) { + doFullSync(redisSlave, false); + } + + protected void doFullSync(RedisSlave redisSlave, boolean freshRdbNeeded) { try { if(logger.isInfoEnabled()){ @@ -83,7 +87,7 @@ protected void doFullSync(RedisSlave redisSlave) { String alert = String.format("FULL(M)<-%s[%s]", redisSlave.metaInfo(), redisKeeperServer.getReplId()); EventMonitor.DEFAULT.logAlertEvent(alert); - redisKeeperServer.fullSyncToSlave(redisSlave); + redisKeeperServer.fullSyncToSlave(redisSlave, freshRdbNeeded); redisKeeperServer.getKeeperMonitor().getKeeperStats().increaseFullSync(); } catch (IOException e) { logger.error("[doFullSync][close client]" + redisSlave, e); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/PsyncHandler.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/PsyncHandler.java index 4c1198750e..51b4015529 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/PsyncHandler.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/PsyncHandler.java @@ -13,6 +13,7 @@ import java.io.IOException; +import static com.ctrip.xpipe.redis.core.protocal.Psync.KEEPER_FRESH_RDB_SYNC_OFFSET; import static com.ctrip.xpipe.redis.core.protocal.Psync.KEEPER_PARTIAL_SYNC_OFFSET; /** @@ -36,14 +37,17 @@ protected void innerDoHandle(final String[] args, final RedisSlave redisSlave, R Long offsetRequest = Long.valueOf(args[1]); String replIdRequest = args[0]; - - if(replIdRequest.equals("?")){ + if (replIdRequest.equals("?") || offsetRequest == 1) { redisSlave.markColdStart(); + } + if(replIdRequest.equals("?")){ if (redisSlave.isKeeper() && offsetRequest.equals(KEEPER_PARTIAL_SYNC_OFFSET) && null != keeperRepl.replId()) { logger.info("[innerDoHandler][keeper psync]"); long continueOffset = keeperRepl.getEndOffset() + 1; // continue from next byte doKeeperPartialSync(redisSlave, keeperRepl.replId(), continueOffset); + } else if (redisSlave.isKeeper() && offsetRequest.equals(KEEPER_FRESH_RDB_SYNC_OFFSET)) { + doFullSync(redisSlave, true); } else { doFullSync(redisSlave); } diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java index a810516e8a..f9a6c88d55 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java @@ -764,17 +764,17 @@ public void promoteSlave(String ip, int port) throws RedisSlavePromotionExceptio } @Override - public void fullSyncToSlave(final RedisSlave redisSlave) throws IOException { + public void fullSyncToSlave(final RedisSlave redisSlave, boolean freshRdbNeeded) throws IOException { logger.info("[fullSyncToSlave]{}, {}", redisSlave, rdbDumper.get()); - if (crossRegion.get() && !tryFullSyncToSlaveWithOthers(redisSlave)) { + if (crossRegion.get() && !redisSlave.isKeeper() && !tryFullSyncToSlaveWithOthers(redisSlave)) { redisSlave.waitForSeqFsync(); return; } boolean tryRordb = false; // slave and master all support rordb or not - if (redisSlave.capaOf(CAPA.RORDB) ) { + if (redisSlave.capaOf(CAPA.RORDB)) { try { logger.info("[fullSyncToSlave][rordb]{}", redisSlave); tryRordb = keeperRedisMaster.checkMasterSupportRordb().get(); @@ -786,28 +786,30 @@ public void fullSyncToSlave(final RedisSlave redisSlave) throws IOException { if(rdbDumper.get() == null){ logger.info("[fullSyncToSlave][dumper null]{}", redisSlave); - FullSyncListener fullSyncListener = new DefaultFullSyncListener(redisSlave); - FULLSYNC_FAIL_CAUSE failCause = getCurrentReplicationStore().fullSyncIfPossible(fullSyncListener, tryRordb); - if(null != failCause){ - if (FULLSYNC_PROGRESS_NOT_SUPPORTED.equals(failCause)) { + if (!freshRdbNeeded) { + FullSyncListener fullSyncListener = new DefaultFullSyncListener(redisSlave); + FULLSYNC_FAIL_CAUSE failCause = getCurrentReplicationStore().fullSyncIfPossible(fullSyncListener, tryRordb); + if (null == failCause) { + return; + } else if (FULLSYNC_PROGRESS_NOT_SUPPORTED.equals(failCause)) { logger.info("[fullSyncToSlave][progress not support][cancel slave]"); redisSlave.close(); return; } + } - try{ - RdbDumper newDumper = dumpNewRdb(tryRordb); - redisSlave.waitForRdbDumping(); - if (newDumper.future().isDone() && !newDumper.future().isSuccess()) { - logger.info("[fullSyncToSlave][new dumper fail immediatelly]"); - redisSlave.close(); - } - }catch(AbstractRdbDumperException e){ - logger.error("[fullSyncToSlave]", e); - if(e.isCancelSlave()){ - logger.info("[fullSyncToSlave][cancel slave]"); - redisSlave.close(); - } + try{ + RdbDumper newDumper = dumpNewRdb(tryRordb, freshRdbNeeded); + redisSlave.waitForRdbDumping(); + if (newDumper.future().isDone() && !newDumper.future().isSuccess()) { + logger.info("[fullSyncToSlave][new dumper fail immediatelly]"); + redisSlave.close(); + } + }catch(AbstractRdbDumperException e){ + logger.error("[fullSyncToSlave]", e); + if(e.isCancelSlave()){ + logger.info("[fullSyncToSlave][cancel slave]"); + redisSlave.close(); } } }else{ @@ -816,7 +818,6 @@ public void fullSyncToSlave(final RedisSlave redisSlave) throws IOException { } private synchronized boolean tryFullSyncToSlaveWithOthers(RedisSlave redisSlave) { - if (redisSlave.isKeeper()) return true; if (loadingSlaves.contains(redisSlave)) return true; int maxConcurrentLoadingSlaves = keeperConfig.getCrossRegionMaxLoadingSlavesCnt(); @@ -874,8 +875,12 @@ public boolean isStartIndexing() { } private RdbDumper dumpNewRdb(boolean tryRordb) throws CreateRdbDumperException, SetRdbDumperException { + return dumpNewRdb(tryRordb, false); + } + + private RdbDumper dumpNewRdb(boolean tryRordb, boolean freshRdbNeeded) throws CreateRdbDumperException, SetRdbDumperException { - RdbDumper rdbDumper = keeperRedisMaster.createRdbDumper(tryRordb); + RdbDumper rdbDumper = keeperRedisMaster.createRdbDumper(tryRordb, freshRdbNeeded); setRdbDumper(rdbDumper); rdbDumper.execute(); return rdbDumper; diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisMaster.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisMaster.java index 8cde955012..781b2f4438 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisMaster.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisMaster.java @@ -136,13 +136,13 @@ public PARTIAL_STATE partialState() { @Override - public RdbDumper createRdbDumper(boolean tryRrodb) throws CreateRdbDumperException { + public RdbDumper createRdbDumper(boolean tryRrodb, boolean freshRdbNeeded) throws CreateRdbDumperException { if(masterState != MASTER_STATE.REDIS_REPL_CONNECTED){ logger.info("[createRdbDumper][master state not connected, dumper not allowed]{}", redisMasterReplication); throw new CreateRdbDumperException(this, "master state not connected, dumper not allowed:" + masterState); } - return new RedisMasterNewRdbDumper(this, redisKeeperServer, tryRrodb, rdbOnlyEventLoopGroup, scheduled, keeperResourceManager); + return new RedisMasterNewRdbDumper(this, redisKeeperServer, tryRrodb, freshRdbNeeded, rdbOnlyEventLoopGroup, scheduled, keeperResourceManager); } public MASTER_STATE getMasterState() { diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/RdbonlyRedisMasterReplication.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/RdbonlyRedisMasterReplication.java index e29dd033c9..afee3874f3 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/RdbonlyRedisMasterReplication.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/RdbonlyRedisMasterReplication.java @@ -2,6 +2,7 @@ import com.ctrip.xpipe.api.server.PARTIAL_STATE; import com.ctrip.xpipe.redis.core.protocal.Psync; +import com.ctrip.xpipe.redis.core.protocal.cmd.FreshRdbOnlyPsync; import com.ctrip.xpipe.redis.core.protocal.cmd.RdbOnlyPsync; import com.ctrip.xpipe.redis.core.protocal.protocal.EofType; import com.ctrip.xpipe.redis.core.store.DumpedRdbStore; @@ -12,11 +13,11 @@ import com.ctrip.xpipe.redis.keeper.config.KeeperResourceManager; import com.ctrip.xpipe.redis.keeper.exception.psync.PsyncConnectMasterFailException; import com.ctrip.xpipe.redis.keeper.exception.psync.PsyncMasterRdbOffsetNotContinuousRuntimeException; -import com.ctrip.xpipe.redis.keeper.exception.psync.RdbOnlyPsyncReplIdNotSameException; import com.ctrip.xpipe.redis.keeper.exception.replication.UnexpectedReplIdException; import com.ctrip.xpipe.redis.keeper.store.RdbOnlyReplicationStore; import com.ctrip.xpipe.utils.VisibleForTesting; import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.nio.NioEventLoopGroup; @@ -37,12 +38,23 @@ public class RdbonlyRedisMasterReplication extends AbstractRedisMasterReplicatio private final boolean tryRordb; - public RdbonlyRedisMasterReplication(RedisKeeperServer redisKeeperServer, RedisMaster redisMaster, boolean tryRordb, + enum REPL_STATE { + NORMAL_SYNC, + FAIL_FOR_NOT_CONTINUE, + FRESH_SYNC, + FAIL + } + + private REPL_STATE state; + + public RdbonlyRedisMasterReplication(RedisKeeperServer redisKeeperServer, RedisMaster redisMaster, + boolean tryRordb, boolean freshRdbNeeded, NioEventLoopGroup nioEventLoopGroup, ScheduledExecutorService scheduled, RdbDumper rdbDumper, KeeperResourceManager resourceManager) { super(redisKeeperServer, redisMaster, nioEventLoopGroup, scheduled, resourceManager); setRdbDumper(rdbDumper); this.tryRordb = tryRordb; + this.state = freshRdbNeeded ? REPL_STATE.FRESH_SYNC : REPL_STATE.NORMAL_SYNC; } @Override @@ -75,6 +87,20 @@ public void operationComplete(ChannelFuture future) throws Exception { }); } + @Override + public void masterDisconnected(Channel channel) { + if (state.equals(REPL_STATE.FAIL_FOR_NOT_CONTINUE)) { + logger.info("[retryOnceForRdbNotContinue][reconnect master]"); + state = REPL_STATE.FRESH_SYNC; + if (replicationObserver != null) { + replicationObserver.onMasterDisconnected(); + } + scheduleReconnect(0); + } else { + super.masterDisconnected(channel); + } + } + @Override protected void doWhenCannotPsync() { try { @@ -93,7 +119,13 @@ protected void psyncFail(Throwable cause) { @Override protected Psync createPsync() { - Psync psync = new RdbOnlyPsync(clientPool, rdbOnlyReplicationStore, scheduled); + Psync psync; + if (state.equals(REPL_STATE.FRESH_SYNC)) { + psync = new FreshRdbOnlyPsync(clientPool, rdbOnlyReplicationStore, scheduled); + } else { + psync = new RdbOnlyPsync(clientPool, rdbOnlyReplicationStore, scheduled); + } + psync.addPsyncObserver(this); psync.addPsyncObserver(redisKeeperServer.createPsyncObserverForRdbOnlyRepl()); return psync; @@ -140,7 +172,19 @@ protected void doOnContinue() { protected void doOnFullSync(long masterRdbOffset) { long firstAvailable = redisMaster.getCurrentReplicationStore().firstAvailableOffset(); if (firstAvailable > masterRdbOffset + 1) { - dumpFail(new PsyncMasterRdbOffsetNotContinuousRuntimeException(masterRdbOffset, firstAvailable)); + if (state.equals(REPL_STATE.NORMAL_SYNC)) { + state = REPL_STATE.FAIL_FOR_NOT_CONTINUE; + try { + logger.info("[retryOnceForRdbNotContinue][resetRdbStore]{}", dumpedRdbStore); + dumpedRdbStore = getRdbDumper().prepareRdbStore(); + rdbOnlyReplicationStore = new RdbOnlyReplicationStore(dumpedRdbStore); + disconnectWithMaster(); + } catch (Exception e) { + dumpFail(new PsyncMasterRdbOffsetNotContinuousRuntimeException(masterRdbOffset, firstAvailable)); + } + } else { + dumpFail(new PsyncMasterRdbOffsetNotContinuousRuntimeException(masterRdbOffset, firstAvailable)); + } } } diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/RedisMasterNewRdbDumper.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/RedisMasterNewRdbDumper.java index 27a4338564..4d66472fe0 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/RedisMasterNewRdbDumper.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/RedisMasterNewRdbDumper.java @@ -41,12 +41,16 @@ public class RedisMasterNewRdbDumper extends AbstractRdbDumper { private final boolean tryRordb; - public RedisMasterNewRdbDumper(RedisMaster redisMaster, RedisKeeperServer redisKeeperServer, boolean tryRordb, + private final boolean freshRdbNeeded; + + public RedisMasterNewRdbDumper(RedisMaster redisMaster, RedisKeeperServer redisKeeperServer, + boolean tryRordb, boolean freshRdbNeeded, NioEventLoopGroup nioEventLoopGroup, ScheduledExecutorService scheduled, KeeperResourceManager resourceManager) { super(redisKeeperServer); this.redisMaster = redisMaster; this.tryRordb = tryRordb; + this.freshRdbNeeded = freshRdbNeeded; this.nioEventLoopGroup = nioEventLoopGroup; this.scheduled = scheduled; this.resourceManager = resourceManager; @@ -87,7 +91,8 @@ protected void releaseResource() { } protected void startRdbOnlyReplication() throws Exception { - rdbonlyRedisMasterReplication = new RdbonlyRedisMasterReplication(redisKeeperServer, redisMaster, tryRordb, nioEventLoopGroup, scheduled, this, resourceManager); + rdbonlyRedisMasterReplication = new RdbonlyRedisMasterReplication(redisKeeperServer, redisMaster, tryRordb, freshRdbNeeded, + nioEventLoopGroup, scheduled, this, resourceManager); rdbonlyRedisMasterReplication.initialize(); rdbonlyRedisMasterReplication.start(); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultReplicationStore.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultReplicationStore.java index c982107694..836cf3f2fe 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultReplicationStore.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultReplicationStore.java @@ -13,6 +13,7 @@ import com.ctrip.xpipe.redis.core.store.OffsetReplicationProgress; import com.ctrip.xpipe.redis.keeper.store.meta.DefaultMetaStore; import com.ctrip.xpipe.utils.FileUtils; +import com.ctrip.xpipe.utils.VisibleForTesting; import io.netty.buffer.ByteBuf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -618,4 +619,9 @@ protected Logger getLogger() { return logger; } + @VisibleForTesting + public void setCommandsRetainTimeoutMilli(int timeoutMilli) { + this.commandsRetainTimeoutMilli = timeoutMilli; + } + } diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractFakeRedisTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractFakeRedisTest.java index ce4f2dd254..4778635c7c 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractFakeRedisTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractFakeRedisTest.java @@ -183,69 +183,68 @@ protected InMemoryPsync sendInmemoryPsync(String ip, int port, String runid, lon } protected InMemoryPsync sendInmemoryPsync(String ip, int port, String runid, long offset, PsyncObserver psyncObserver) throws Exception { - - SequenceCommandChain chain = new SequenceCommandChain(false); - SimpleObjectPool pool = getXpipeNettyClientKeyedObjectPool().getKeyPool(new DefaultEndPoint(ip, port)); - NettyClient nettyClient = null; - - try{ - nettyClient = pool.borrowObject(); - - SimpleObjectPool clientPool = new FixedObjectPool(nettyClient); - chain.add(new Replconf(clientPool, - ReplConfType.CAPA, scheduled, CAPA.EOF.toString())); - InMemoryPsync psync = new InMemoryPsync(clientPool, runid, offset, scheduled); - chain.add(psync); - - if(psyncObserver != null){ - psync.addPsyncObserver(psyncObserver); - } - psync.addPsyncObserver(new PsyncObserver() { - - private long masterRdbOffset = 0; - @Override - public void reFullSync() { - - } - - @Override - public void onFullSync(long masterRdbOffset) { - - } - - @Override - public void onContinue(String requestReplId, String responseReplId) { - - } - - @Override - public void onKeeperContinue(String replId, long beginOffset) { - - } - - @Override - public void readAuxEnd(RdbStore rdbStore, Map auxMap) { - - } - - @Override - public void endWriteRdb() { - new Replconf(clientPool, ReplConfType.ACK, scheduled, String.valueOf(masterRdbOffset)).execute(); - } - - @Override - public void beginWriteRdb(EofType eofType, String replId, long masterRdbOffset) throws IOException { - this.masterRdbOffset = masterRdbOffset; - } - }); - - chain.execute(); - return psync; - }finally{ + NettyClient nettyClient = pool.borrowObject(); + try { + return sendInmemoryPsync(new FixedObjectPool<>(nettyClient), runid, offset, psyncObserver); + } finally { if(nettyClient != null){ pool.returnObject(nettyClient); } } } + + protected InMemoryPsync sendInmemoryPsync(SimpleObjectPool clientPool, String runid, long offset, PsyncObserver psyncObserver) throws Exception { + + SequenceCommandChain chain = new SequenceCommandChain(false); + chain.add(new Replconf(clientPool, + ReplConfType.CAPA, scheduled, CAPA.EOF.toString())); + InMemoryPsync psync = new InMemoryPsync(clientPool, runid, offset, scheduled); + chain.add(psync); + + if(psyncObserver != null){ + psync.addPsyncObserver(psyncObserver); + } + psync.addPsyncObserver(new PsyncObserver() { + + private long masterRdbOffset = 0; + @Override + public void reFullSync() { + + } + + @Override + public void onFullSync(long masterRdbOffset) { + + } + + @Override + public void onContinue(String requestReplId, String responseReplId) { + + } + + @Override + public void onKeeperContinue(String replId, long beginOffset) { + + } + + @Override + public void readAuxEnd(RdbStore rdbStore, Map auxMap) { + + } + + @Override + public void endWriteRdb() { + new Replconf(clientPool, ReplConfType.ACK, scheduled, String.valueOf(masterRdbOffset)).execute(); + } + + @Override + public void beginWriteRdb(EofType eofType, String replId, long masterRdbOffset) throws IOException { + this.masterRdbOffset = masterRdbOffset; + } + }); + + chain.execute(); + return psync; + } } diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AllTests.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AllTests.java index eedfa9bec4..55e4bfedc5 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AllTests.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AllTests.java @@ -45,6 +45,7 @@ RedisKeeperServerStateActiveTest.class, RedisKeeperServerStateUnknownTest.class, DefaultRedisMasterReplicationTest.class, + RdbonlyRedisMasterReplicationTest.class, RedisMasterNewRdbDumperTest.class, StateBackupDeadlockTest.class, KeeperContainerServiceTest.class, @@ -66,6 +67,7 @@ InfoHandlerTest.class, ConfigHandlerTest.class, ApplierCommandHandlerTest.class, + FakeRedisRdbOnlyDumpTest.class, DefaultKeeperStatsTest.class, DefaultLeakyBucketTest.class, diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/RdbonlyRedisMasterReplicationTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/RdbonlyRedisMasterReplicationTest.java index e8c68d84c2..b7f2627f74 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/RdbonlyRedisMasterReplicationTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/RdbonlyRedisMasterReplicationTest.java @@ -2,23 +2,37 @@ import com.ctrip.xpipe.endpoint.DefaultEndPoint; import com.ctrip.xpipe.redis.core.protocal.MASTER_STATE; +import com.ctrip.xpipe.redis.core.protocal.Psync; import com.ctrip.xpipe.redis.core.protocal.cmd.AbstractRedisCommand; +import com.ctrip.xpipe.redis.core.protocal.cmd.FreshRdbOnlyPsync; +import com.ctrip.xpipe.redis.core.protocal.cmd.RdbOnlyPsync; import com.ctrip.xpipe.redis.core.store.DumpedRdbStore; +import com.ctrip.xpipe.redis.core.store.ReplicationStore; import com.ctrip.xpipe.redis.core.store.ReplicationStoreManager; +import com.ctrip.xpipe.redis.keeper.AbstractRedisKeeperContextTest; import com.ctrip.xpipe.redis.keeper.AbstractRedisKeeperTest; import com.ctrip.xpipe.redis.keeper.RedisMaster; import com.ctrip.xpipe.redis.keeper.config.KeeperResourceManager; +import com.ctrip.xpipe.redis.keeper.exception.psync.PsyncMasterRdbOffsetNotContinuousRuntimeException; +import com.ctrip.xpipe.redis.keeper.monitor.KeeperMonitor; +import com.ctrip.xpipe.redis.keeper.monitor.KeeperStats; +import com.ctrip.xpipe.redis.keeper.monitor.MasterStats; +import com.ctrip.xpipe.redis.keeper.monitor.ReplicationStoreStats; +import com.ctrip.xpipe.redis.keeper.ratelimit.LeakyBucketBasedMasterReplicationListener; +import io.netty.channel.Channel; import io.netty.channel.nio.NioEventLoopGroup; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.mockito.junit.MockitoJUnitRunner; import java.io.IOException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; import static org.mockito.Mockito.*; @@ -27,8 +41,8 @@ *

* Jul 05, 2018 */ -@RunWith(MockitoJUnitRunner.class) -public class RdbonlyRedisMasterReplicationTest extends AbstractRedisKeeperTest { +@RunWith(MockitoJUnitRunner.Silent.class) +public class RdbonlyRedisMasterReplicationTest extends AbstractRedisKeeperContextTest { private RedisMaster keeperRedisMaster; @@ -45,24 +59,40 @@ public class RdbonlyRedisMasterReplicationTest extends AbstractRedisKeeperTest { @Mock private DefaultRedisKeeperServer keeperServer; + @Mock + private KeeperMonitor keeperMonitor; + + @Mock + private MasterStats masterStats; + + @Mock + private ReplicationStoreStats replicationStoreStats; + + @Mock + private ReplicationStore replicationStore; + @Before public void beforeRdbonlyRedisMasterReplicationTest() throws IOException { - MockitoAnnotations.initMocks(this); when(keeperServer.getRedisMaster()).thenReturn(keeperRedisMaster); - } + when(keeperServer.getKeeperMonitor()).thenReturn(keeperMonitor); + when(keeperMonitor.getMasterStats()).thenReturn(masterStats); + when(keeperMonitor.getReplicationStoreStats()).thenReturn(replicationStoreStats); + when(keeperServer.getReplicationStore()).thenReturn(replicationStore); - @Test - public void testTimeoutMilli() throws CreateRdbDumperException { target = new DefaultEndPoint("localhost", randomPort()); this.keeperRedisMaster = new DefaultRedisMaster(keeperServer, target, masterEventLoopGroup, rdbEventLoopGroup, masterConfigEventLoopGroup, replicationStoreManager, scheduled, getRegistry().getComponent(KeeperResourceManager.class)); keeperRedisMaster.setMasterState(MASTER_STATE.REDIS_REPL_CONNECTED); - RedisMasterNewRdbDumper dumper = (RedisMasterNewRdbDumper)keeperRedisMaster.createRdbDumper(false); + } + + @Test + public void testTimeoutMilli() throws CreateRdbDumperException { + RedisMasterNewRdbDumper dumper = (RedisMasterNewRdbDumper)keeperRedisMaster.createRdbDumper(false, false); RdbonlyRedisMasterReplication rdbonlyRedisMasterReplication = new RdbonlyRedisMasterReplication(keeperServer, - keeperRedisMaster, false, masterEventLoopGroup, scheduled, dumper, getRegistry().getComponent(KeeperResourceManager.class)); + keeperRedisMaster, false, false, masterEventLoopGroup, scheduled, dumper, getRegistry().getComponent(KeeperResourceManager.class)); int time = rdbonlyRedisMasterReplication.commandTimeoutMilli(); - Assert.assertEquals(AbstractRedisCommand.DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI, time); + Assert.assertEquals(AbstractRedisCommand.PROXYED_REDIS_CONNECTION_COMMAND_TIME_OUT_MILLI, time); } @Test @@ -72,7 +102,7 @@ public void releaseRdbFileWhenCannotPsync() throws Exception { RdbonlyRedisMasterReplication replication = spy(new RdbonlyRedisMasterReplication( mock(DefaultRedisKeeperServer.class), mock(RedisMaster.class), - false, + false, false, mock(NioEventLoopGroup.class), mock(ScheduledExecutorService.class), mock(RedisMasterNewRdbDumper.class), @@ -85,4 +115,62 @@ public void releaseRdbFileWhenCannotPsync() throws Exception { verify(rdbStore, times(1)).close(); verify(rdbStore, times(1)).destroy(); } + + @Test + public void testFailForNotContinue_retryWithRefreshOnlyOnce() throws Exception { + AtomicInteger reconnectCnt = new AtomicInteger(0); + RedisMasterNewRdbDumper dumper = (RedisMasterNewRdbDumper)keeperRedisMaster.createRdbDumper(false, false); + RdbonlyRedisMasterReplication rdbonlyRedisMasterReplication = new RdbonlyRedisMasterReplication(keeperServer, + keeperRedisMaster, false, false, masterEventLoopGroup, scheduled, dumper, getRegistry().getComponent(KeeperResourceManager.class)) { + @Override + protected void scheduleReconnect(int timeMilli) { + reconnectCnt.incrementAndGet(); + } + }; + Psync psync = rdbonlyRedisMasterReplication.createPsync(); + Assert.assertEquals(RdbOnlyPsync.class, psync.getClass()); + + when(replicationStore.firstAvailableOffset()).thenReturn(120L); + rdbonlyRedisMasterReplication.onFullSync(100); + Assert.assertFalse(dumper.future().isDone()); + + rdbonlyRedisMasterReplication.masterDisconnected(Mockito.mock(Channel.class)); + Assert.assertEquals(1, reconnectCnt.get()); + Assert.assertFalse(dumper.future().isDone()); + + psync = rdbonlyRedisMasterReplication.createPsync(); + Assert.assertEquals(FreshRdbOnlyPsync.class, psync.getClass()); + + when(replicationStore.firstAvailableOffset()).thenReturn(120L); + rdbonlyRedisMasterReplication.onFullSync(100); + Assert.assertTrue(dumper.future().isDone()); + Assert.assertEquals(PsyncMasterRdbOffsetNotContinuousRuntimeException.class, dumper.future().cause().getClass()); + } + + @Test + public void testFailForNotContinue_reconnectMasterOnlyOnce() throws Exception { + AtomicInteger reconnectCnt = new AtomicInteger(0); + RedisMasterNewRdbDumper dumper = (RedisMasterNewRdbDumper)keeperRedisMaster.createRdbDumper(false, false); + RdbonlyRedisMasterReplication rdbonlyRedisMasterReplication = new RdbonlyRedisMasterReplication(keeperServer, + keeperRedisMaster, false, false, masterEventLoopGroup, scheduled, dumper, getRegistry().getComponent(KeeperResourceManager.class)) { + @Override + protected void scheduleReconnect(int timeMilli) { + reconnectCnt.incrementAndGet(); + } + }; + + when(replicationStore.firstAvailableOffset()).thenReturn(120L); + rdbonlyRedisMasterReplication.onFullSync(100); + Assert.assertFalse(dumper.future().isDone()); + + rdbonlyRedisMasterReplication.masterDisconnected(Mockito.mock(Channel.class)); + Assert.assertEquals(1, reconnectCnt.get()); + Assert.assertFalse(dumper.future().isDone()); + + rdbonlyRedisMasterReplication.masterDisconnected(Mockito.mock(Channel.class)); + Assert.assertEquals(1, reconnectCnt.get()); + Assert.assertTrue(dumper.future().isDone()); + Assert.assertNotNull(dumper.future().cause()); + } + } \ No newline at end of file diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/RedisMasterNewRdbDumperTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/RedisMasterNewRdbDumperTest.java index 5849af815d..236612da5c 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/RedisMasterNewRdbDumperTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/RedisMasterNewRdbDumperTest.java @@ -25,7 +25,7 @@ public void whenRdbOffsetNotContinuous() throws Exception { RedisKeeperServer redisKeeperServer = mock(RedisKeeperServer.class); doNothing().when(redisKeeperServer).resetDefaultReplication(); - RedisMasterNewRdbDumper dumper = spy(new RedisMasterNewRdbDumper(redisMaster, redisKeeperServer, false, mock(NioEventLoopGroup.class), + RedisMasterNewRdbDumper dumper = spy(new RedisMasterNewRdbDumper(redisMaster, redisKeeperServer, false, false, mock(NioEventLoopGroup.class), mock(ScheduledExecutorService.class), mock(KeeperResourceManager.class))); doNothing().when(dumper).startRdbOnlyReplication(); @@ -37,7 +37,7 @@ public void whenRdbOffsetNotContinuous() throws Exception { @Test public void cancel() throws Exception { - RedisMasterNewRdbDumper dumper = spy(new RedisMasterNewRdbDumper(mock(RedisMaster.class), mock(RedisKeeperServer.class), false, mock(NioEventLoopGroup.class), + RedisMasterNewRdbDumper dumper = spy(new RedisMasterNewRdbDumper(mock(RedisMaster.class), mock(RedisKeeperServer.class), false, false, mock(NioEventLoopGroup.class), mock(ScheduledExecutorService.class), mock(KeeperResourceManager.class))); doNothing().when(dumper).doExecute(); diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/fakeredis/FakeRedisRdbOnlyDumpTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/fakeredis/FakeRedisRdbOnlyDumpTest.java new file mode 100644 index 0000000000..1bb93e1626 --- /dev/null +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/fakeredis/FakeRedisRdbOnlyDumpTest.java @@ -0,0 +1,47 @@ +package com.ctrip.xpipe.redis.keeper.impl.fakeredis; + +import com.ctrip.xpipe.endpoint.DefaultEndPoint; +import com.ctrip.xpipe.netty.NettyPoolUtil; +import com.ctrip.xpipe.redis.core.protocal.cmd.InMemoryPsync; +import com.ctrip.xpipe.redis.keeper.AbstractFakeRedisTest; +import com.ctrip.xpipe.redis.keeper.RedisKeeperServer; +import com.ctrip.xpipe.redis.keeper.store.DefaultReplicationStore; +import org.junit.Assert; +import org.junit.Test; + +public class FakeRedisRdbOnlyDumpTest extends AbstractFakeRedisTest { + + @Test + public void testRdbOnlyNotClearStoreForNotContinue() throws Exception { + RedisKeeperServer activeDcKeeperServer = startRedisKeeperServerAndConnectToFakeRedis(); + RedisKeeperServer backupDcKeeperServer = startRedisKeeperServer(1, allCommandsSize, 1); + backupDcKeeperServer.getRedisKeeperServerState().becomeActive(new DefaultEndPoint("127.0.0.1", activeDcKeeperServer.getListeningPort())); + ((DefaultReplicationStore)backupDcKeeperServer.getReplicationStore()).setCommandsRetainTimeoutMilli(1); + waitRedisKeeperServerConnected(backupDcKeeperServer); + + InMemoryPsync firstSlave = sendInmemoryPsync("127.0.0.1", backupDcKeeperServer.getListeningPort()); + firstSlave.future().addListener(f -> { + logger.info("[firstSlave] {}", f.isSuccess(), f.cause()); + }); + waitConditionUntilTimeOut(() -> allCommandsSize == firstSlave.getCommands().length); + Assert.assertEquals(1, backupDcKeeperServer.getKeeperMonitor().getKeeperStats().getFullSyncCount()); + + fakeRedisServer.reGenerateRdb(); + backupDcKeeperServer.releaseRdb(); + waitConditionUntilTimeOut(() -> allCommandsSize * 2 == firstSlave.getCommands().length); + activeDcKeeperServer.getReplicationStore().gc(); + backupDcKeeperServer.getReplicationStore().gc(); + Assert.assertTrue(activeDcKeeperServer.getReplicationStore().firstAvailableOffset() < backupDcKeeperServer.getReplicationStore().firstAvailableOffset()); + Assert.assertNotNull(activeDcKeeperServer.getReplicationStore().getMetaStore().dupReplicationStoreMeta().getRdbFile()); + Assert.assertTrue(activeDcKeeperServer.getReplicationStore().getMetaStore().dupReplicationStoreMeta().getRdbLastOffset() + 1 + < backupDcKeeperServer.getReplicationStore().firstAvailableOffset()); + + InMemoryPsync secondSlave = sendInmemoryPsync(NettyPoolUtil.createNettyPool( + new DefaultEndPoint("127.0.0.1", backupDcKeeperServer.getListeningPort())), "?", -1, null); + waitConditionUntilTimeOut(() -> allCommandsSize == secondSlave.getCommands().length); + Assert.assertEquals(2, backupDcKeeperServer.getKeeperMonitor().getKeeperStats().getFullSyncCount()); + Assert.assertFalse(firstSlave.future().isDone()); + Assert.assertFalse(secondSlave.future().isDone()); + } + +} diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/ratelimit/LeakyBucketBasedMasterReplicationListenerTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/ratelimit/LeakyBucketBasedMasterReplicationListenerTest.java index 8ac80fefaf..acf356c227 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/ratelimit/LeakyBucketBasedMasterReplicationListenerTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/ratelimit/LeakyBucketBasedMasterReplicationListenerTest.java @@ -92,7 +92,20 @@ public void beforeLeakyBucketBasedMasterReplicationListenerTest() { } @Test - public void testOnMasterConnected() { + public void testOnMasterConnected_resetToken() { + LeakyBucket leakyBucket = new DefaultLeakyBucket(1); + when(redisKeeperServer.getRedisKeeperServerState()).thenReturn(new RedisKeeperServerStateActive(redisKeeperServer)); + when(redisMaster.isKeeper()).thenReturn(true); + when(masterStats.lastMasterType()).thenReturn(SERVER_TYPE.REDIS); + when(resourceManager.getLeakyBucket()).thenReturn(leakyBucket); + + listener.onMasterConnected(); + Assert.assertTrue(listener.canSendPsync()); + Assert.assertEquals(0, leakyBucket.references()); + listener.onMasterConnected(); + Assert.assertEquals(1, leakyBucket.references()); + Assert.assertTrue(listener.canSendPsync()); + Assert.assertEquals(0, leakyBucket.references()); } @Test