Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keeper support freshRdbPsync #922

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public interface Psync extends Command<Object>, Closeable {
String FULL_SYNC = "FULLRESYNC";
String PARTIAL_SYNC = "CONTINUE";
long KEEPER_PARTIAL_SYNC_OFFSET = -2;
long KEEPER_FRESH_RDB_SYNC_OFFSET = -3;

void addPsyncObserver(PsyncObserver observer);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.ctrip.xpipe.redis.core.protocal.cmd;

import com.ctrip.xpipe.api.pool.SimpleObjectPool;
import com.ctrip.xpipe.netty.commands.NettyClient;
import com.ctrip.xpipe.redis.core.store.ReplicationStore;
import com.ctrip.xpipe.tuple.Pair;

import java.util.concurrent.ScheduledExecutorService;

public class FreshRdbOnlyPsync extends RdbOnlyPsync {

public FreshRdbOnlyPsync(SimpleObjectPool<NettyClient> clientPool, ReplicationStore store, ScheduledExecutorService scheduled) {
super(clientPool, store, scheduled);
}

@Override
protected Pair<String, Long> getRequestMasterInfo() {
// psync ? -3
return new Pair<>("?", KEEPER_FRESH_RDB_SYNC_OFFSET);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.ctrip.xpipe.redis.core.protocal.cmd;

import com.ctrip.xpipe.api.command.CommandFuture;
import com.ctrip.xpipe.api.command.CommandFutureListener;
import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.endpoint.DefaultEndPoint;
import com.ctrip.xpipe.netty.NettyPoolUtil;
import com.ctrip.xpipe.redis.core.AbstractRedisTest;
import com.ctrip.xpipe.redis.core.protocal.PsyncObserver;
import com.ctrip.xpipe.redis.core.protocal.protocal.EofType;
import com.ctrip.xpipe.redis.core.redis.RunidGenerator;
import com.ctrip.xpipe.redis.core.store.MetaStore;
import com.ctrip.xpipe.redis.core.store.RdbStore;
import com.ctrip.xpipe.redis.core.store.ReplicationStore;
import com.ctrip.xpipe.redis.core.store.ReplicationStoreManager;
import com.ctrip.xpipe.simpleserver.Server;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import static org.mockito.Mockito.*;

@RunWith(MockitoJUnitRunner.Silent.class)
public class FreshRdbOnlyPsyncTest extends AbstractRedisTest {

@Mock
private ReplicationStore replicationStore;

@Mock
private MetaStore metaStore;

@Before
public void beforeDefaultPsyncTest() throws Exception{
when(replicationStore.getMetaStore()).thenReturn(metaStore);
when(replicationStore.getEndOffset()).thenReturn(-1L);

}

@Test
public void testFreshRdbOnlyPsync() throws Exception {
String replId = RunidGenerator.DEFAULT.generateRunid();
int offset = 100;
Server redisServer = startServer(randomPort(), new Function<String, String>() {
@Override
public String apply(String s) {
logger.info("[testFreshRdbOnlyPsync] {}", s);
if (s.trim().equals("psync ? -3")) {
return String.format("+FULLRESYNC %s %d\r\n", replId, offset);
} else {
return "+OK\r\n";
}
}
});
Endpoint redisEndpoint = new DefaultEndPoint("127.0.0.1", redisServer.getPort());
FreshRdbOnlyPsync psync = new FreshRdbOnlyPsync(NettyPoolUtil.createNettyPool(redisEndpoint), replicationStore, scheduled);

CountDownLatch latch = new CountDownLatch(1);
AtomicInteger masetrOffset = new AtomicInteger(0);
psync.addPsyncObserver(new PsyncObserver() {
@Override
public void onFullSync(long masterRdbOffset) {
masetrOffset.set((int)masterRdbOffset);
latch.countDown();
}
@Override
public void reFullSync() {
}
@Override
public void beginWriteRdb(EofType eofType, String replId, long masterRdbOffset) throws IOException {
}
@Override
public void endWriteRdb() {
}
@Override
public void onContinue(String requestReplId, String responseReplId) {
}
@Override
public void onKeeperContinue(String replId, long beginOffset) {
}
@Override
public void readAuxEnd(RdbStore rdbStore, Map<String, String> auxMap) {
}
});
psync.execute().addListener(new CommandFutureListener<Object>() {

@Override
public void operationComplete(CommandFuture<Object> commandFuture) throws Exception {
if(!commandFuture.isSuccess()){
logger.error("[operationComplete]", commandFuture.cause());
}
}
});

latch.await(1000, TimeUnit.SECONDS);
Assert.assertEquals(offset, masetrOffset.get());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ public static enum PROMOTION_STATE{

}

void fullSyncToSlave(RedisSlave redisSlave) throws IOException;
default void fullSyncToSlave(RedisSlave redisSlave) throws IOException {
fullSyncToSlave(redisSlave, false);
}

void fullSyncToSlave(RedisSlave redisSlave, boolean freshRdbNeeded) throws IOException;

void startIndexing() throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface RedisMaster extends RedisRole, Lifecycle, LifecycleStateAware,

void reconnect();

RdbDumper createRdbDumper(boolean tryRrodb) throws CreateRdbDumperException;
RdbDumper createRdbDumper(boolean tryRrodb, boolean freshRdbNeeded) throws CreateRdbDumperException;

MASTER_STATE getMasterState();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public interface RedisSlave extends RedisClient<RedisKeeperServer>, PartialAware
void markPsyncProcessed();

/**
* If psync ? -1, slave start with no data, we should fsync immediately
* If psync ? -1 or psync xxxx 1, slave start with no data, we should fsync immediately
*/
void markColdStart();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ public void run() {
protected abstract void innerDoHandle(final String[] args, final RedisSlave redisSlave, RedisKeeperServer redisKeeperServer) throws IOException;

protected void doFullSync(RedisSlave redisSlave) {
doFullSync(redisSlave, false);
}

protected void doFullSync(RedisSlave redisSlave, boolean freshRdbNeeded) {

try {
if(logger.isInfoEnabled()){
Expand All @@ -83,7 +87,7 @@ protected void doFullSync(RedisSlave redisSlave) {
String alert = String.format("FULL(M)<-%s[%s]", redisSlave.metaInfo(), redisKeeperServer.getReplId());
EventMonitor.DEFAULT.logAlertEvent(alert);

redisKeeperServer.fullSyncToSlave(redisSlave);
redisKeeperServer.fullSyncToSlave(redisSlave, freshRdbNeeded);
redisKeeperServer.getKeeperMonitor().getKeeperStats().increaseFullSync();
} catch (IOException e) {
logger.error("[doFullSync][close client]" + redisSlave, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.io.IOException;

import static com.ctrip.xpipe.redis.core.protocal.Psync.KEEPER_FRESH_RDB_SYNC_OFFSET;
import static com.ctrip.xpipe.redis.core.protocal.Psync.KEEPER_PARTIAL_SYNC_OFFSET;

/**
Expand All @@ -36,14 +37,17 @@ protected void innerDoHandle(final String[] args, final RedisSlave redisSlave, R

Long offsetRequest = Long.valueOf(args[1]);
String replIdRequest = args[0];

if(replIdRequest.equals("?")){
if (replIdRequest.equals("?") || offsetRequest == 1) {
redisSlave.markColdStart();
}

if(replIdRequest.equals("?")){
if (redisSlave.isKeeper() && offsetRequest.equals(KEEPER_PARTIAL_SYNC_OFFSET) && null != keeperRepl.replId()) {
logger.info("[innerDoHandler][keeper psync]");
long continueOffset = keeperRepl.getEndOffset() + 1; // continue from next byte
doKeeperPartialSync(redisSlave, keeperRepl.replId(), continueOffset);
} else if (redisSlave.isKeeper() && offsetRequest.equals(KEEPER_FRESH_RDB_SYNC_OFFSET)) {
doFullSync(redisSlave, true);
} else {
doFullSync(redisSlave);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,17 +764,17 @@ public void promoteSlave(String ip, int port) throws RedisSlavePromotionExceptio
}

@Override
public void fullSyncToSlave(final RedisSlave redisSlave) throws IOException {
public void fullSyncToSlave(final RedisSlave redisSlave, boolean freshRdbNeeded) throws IOException {

logger.info("[fullSyncToSlave]{}, {}", redisSlave, rdbDumper.get());

if (crossRegion.get() && !tryFullSyncToSlaveWithOthers(redisSlave)) {
if (crossRegion.get() && !redisSlave.isKeeper() && !tryFullSyncToSlaveWithOthers(redisSlave)) {
redisSlave.waitForSeqFsync();
return;
}

boolean tryRordb = false; // slave and master all support rordb or not
if (redisSlave.capaOf(CAPA.RORDB) ) {
if (redisSlave.capaOf(CAPA.RORDB)) {
try {
logger.info("[fullSyncToSlave][rordb]{}", redisSlave);
tryRordb = keeperRedisMaster.checkMasterSupportRordb().get();
Expand All @@ -786,28 +786,30 @@ public void fullSyncToSlave(final RedisSlave redisSlave) throws IOException {

if(rdbDumper.get() == null){
logger.info("[fullSyncToSlave][dumper null]{}", redisSlave);
FullSyncListener fullSyncListener = new DefaultFullSyncListener(redisSlave);
FULLSYNC_FAIL_CAUSE failCause = getCurrentReplicationStore().fullSyncIfPossible(fullSyncListener, tryRordb);
if(null != failCause){
if (FULLSYNC_PROGRESS_NOT_SUPPORTED.equals(failCause)) {
if (!freshRdbNeeded) {
FullSyncListener fullSyncListener = new DefaultFullSyncListener(redisSlave);
FULLSYNC_FAIL_CAUSE failCause = getCurrentReplicationStore().fullSyncIfPossible(fullSyncListener, tryRordb);
if (null == failCause) {
return;
} else if (FULLSYNC_PROGRESS_NOT_SUPPORTED.equals(failCause)) {
logger.info("[fullSyncToSlave][progress not support][cancel slave]");
redisSlave.close();
return;
}
}

try{
RdbDumper newDumper = dumpNewRdb(tryRordb);
redisSlave.waitForRdbDumping();
if (newDumper.future().isDone() && !newDumper.future().isSuccess()) {
logger.info("[fullSyncToSlave][new dumper fail immediatelly]");
redisSlave.close();
}
}catch(AbstractRdbDumperException e){
logger.error("[fullSyncToSlave]", e);
if(e.isCancelSlave()){
logger.info("[fullSyncToSlave][cancel slave]");
redisSlave.close();
}
try{
RdbDumper newDumper = dumpNewRdb(tryRordb, freshRdbNeeded);
redisSlave.waitForRdbDumping();
if (newDumper.future().isDone() && !newDumper.future().isSuccess()) {
logger.info("[fullSyncToSlave][new dumper fail immediatelly]");
redisSlave.close();
}
}catch(AbstractRdbDumperException e){
logger.error("[fullSyncToSlave]", e);
if(e.isCancelSlave()){
logger.info("[fullSyncToSlave][cancel slave]");
redisSlave.close();
}
}
}else{
Expand All @@ -816,7 +818,6 @@ public void fullSyncToSlave(final RedisSlave redisSlave) throws IOException {
}

private synchronized boolean tryFullSyncToSlaveWithOthers(RedisSlave redisSlave) {
if (redisSlave.isKeeper()) return true;
if (loadingSlaves.contains(redisSlave)) return true;

int maxConcurrentLoadingSlaves = keeperConfig.getCrossRegionMaxLoadingSlavesCnt();
Expand Down Expand Up @@ -874,8 +875,12 @@ public boolean isStartIndexing() {
}

private RdbDumper dumpNewRdb(boolean tryRordb) throws CreateRdbDumperException, SetRdbDumperException {
return dumpNewRdb(tryRordb, false);
}

private RdbDumper dumpNewRdb(boolean tryRordb, boolean freshRdbNeeded) throws CreateRdbDumperException, SetRdbDumperException {

RdbDumper rdbDumper = keeperRedisMaster.createRdbDumper(tryRordb);
RdbDumper rdbDumper = keeperRedisMaster.createRdbDumper(tryRordb, freshRdbNeeded);
setRdbDumper(rdbDumper);
rdbDumper.execute();
return rdbDumper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,13 @@ public PARTIAL_STATE partialState() {


@Override
public RdbDumper createRdbDumper(boolean tryRrodb) throws CreateRdbDumperException {
public RdbDumper createRdbDumper(boolean tryRrodb, boolean freshRdbNeeded) throws CreateRdbDumperException {

if(masterState != MASTER_STATE.REDIS_REPL_CONNECTED){
logger.info("[createRdbDumper][master state not connected, dumper not allowed]{}", redisMasterReplication);
throw new CreateRdbDumperException(this, "master state not connected, dumper not allowed:" + masterState);
}
return new RedisMasterNewRdbDumper(this, redisKeeperServer, tryRrodb, rdbOnlyEventLoopGroup, scheduled, keeperResourceManager);
return new RedisMasterNewRdbDumper(this, redisKeeperServer, tryRrodb, freshRdbNeeded, rdbOnlyEventLoopGroup, scheduled, keeperResourceManager);
}

public MASTER_STATE getMasterState() {
Expand Down
Loading
Loading