Skip to content

Commit

Permalink
Merge pull request elastic#15475 from s1monw/beef_up_translog_tests
Browse files Browse the repository at this point in the history
Beef up TranslogTests with concurrent fatal exceptions test
  • Loading branch information
s1monw committed Dec 16, 2015
1 parent 2b8f27e commit 6abf5d8
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 55 deletions.
36 changes: 21 additions & 15 deletions core/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public Translog(TranslogConfig config) throws IOException {

try {
if (translogGeneration != null) {
final Checkpoint checkpoint = Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME));
final Checkpoint checkpoint = readCheckpoint();
this.recoveredTranslogs = recoverFromFiles(translogGeneration, checkpoint);
if (recoveredTranslogs.isEmpty()) {
throw new IllegalStateException("at least one reader must be recovered");
Expand Down Expand Up @@ -519,13 +519,7 @@ public Location add(Operation operation) throws IOException {
return location;
}
} catch (AlreadyClosedException | IOException ex) {
if (current.getTragicException() != null) {
try {
close();
} catch (Exception inner) {
ex.addSuppressed(inner);
}
}
closeOnTragicEvent(ex);
throw ex;
} catch (Throwable e) {
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
Expand Down Expand Up @@ -605,13 +599,7 @@ public void sync() throws IOException {
current.sync();
}
} catch (AlreadyClosedException | IOException ex) {
if (current.getTragicException() != null) {
try {
close();
} catch (Exception inner) {
ex.addSuppressed(inner);
}
}
closeOnTragicEvent(ex);
throw ex;
}
}
Expand Down Expand Up @@ -643,10 +631,23 @@ public boolean ensureSynced(Location location) throws IOException {
ensureOpen();
return current.syncUpTo(location.translogLocation + location.size);
}
} catch (AlreadyClosedException | IOException ex) {
closeOnTragicEvent(ex);
throw ex;
}
return false;
}

private void closeOnTragicEvent(Throwable ex) {
if (current.getTragicException() != null) {
try {
close();
} catch (Exception inner) {
ex.addSuppressed(inner);
}
}
}

/**
* return stats
*/
Expand Down Expand Up @@ -1882,4 +1883,9 @@ public Throwable getTragicException() {
return current.getTragicException();
}

/** Reads and returns the current checkpoint */
final Checkpoint readCheckpoint() throws IOException {
return Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME));
}

}
216 changes: 176 additions & 40 deletions core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.lucene.mockfile.FilterFileChannel;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LineFileDocs;
import org.apache.lucene.util.LuceneTestCase;
Expand Down Expand Up @@ -1426,11 +1427,11 @@ private static class TranslogThread extends Thread {
private final CountDownLatch downLatch;
private final int opsPerThread;
private final int threadId;
private final BlockingQueue<LocationOperation> writtenOperations;
private final Collection<LocationOperation> writtenOperations;
private final Throwable[] threadExceptions;
private final Translog translog;

public TranslogThread(Translog translog, CountDownLatch downLatch, int opsPerThread, int threadId, BlockingQueue<LocationOperation> writtenOperations, Throwable[] threadExceptions) {
public TranslogThread(Translog translog, CountDownLatch downLatch, int opsPerThread, int threadId, Collection<LocationOperation> writtenOperations, Throwable[] threadExceptions) {
this.translog = translog;
this.downLatch = downLatch;
this.opsPerThread = opsPerThread;
Expand Down Expand Up @@ -1466,76 +1467,58 @@ public void run() {
throw new ElasticsearchException("not supported op type");
}

Translog.Location loc = translog.add(op);
Translog.Location loc = add(op);
writtenOperations.add(new LocationOperation(op, loc));
afterAdd();
}
} catch (Throwable t) {
threadExceptions[threadId] = t;
}
}

protected Translog.Location add(Translog.Operation op) throws IOException {
return translog.add(op);
}

protected void afterAdd() throws IOException {}
}

public void testFailFlush() throws IOException {
Path tempDir = createTempDir();
final AtomicBoolean simulateDiskFull = new AtomicBoolean();
final AtomicBoolean fail = new AtomicBoolean();
TranslogConfig config = getTranslogConfig(tempDir);
Translog translog = new Translog(config) {
@Override
TranslogWriter.ChannelFactory getChannelFactory() {
final TranslogWriter.ChannelFactory factory = super.getChannelFactory();

return new TranslogWriter.ChannelFactory() {
@Override
public FileChannel open(Path file) throws IOException {
FileChannel channel = factory.open(file);
return new FilterFileChannel(channel) {

@Override
public int write(ByteBuffer src) throws IOException {
if (simulateDiskFull.get()) {
if (src.limit() > 1) {
final int pos = src.position();
final int limit = src.limit();
src.limit(limit / 2);
super.write(src);
src.position(pos);
src.limit(limit);
throw new IOException("__FAKE__ no space left on device");
}
}
return super.write(src);
}
};
}
};
}
};
Translog translog = getFailableTranslog(fail, config);

List<Translog.Location> locations = new ArrayList<>();
int opsSynced = 0;
int opsAdded = 0;
boolean failed = false;
while(failed == false) {
try {
locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
opsAdded++;
translog.sync();
opsSynced++;
} catch (MockDirectoryWrapper.FakeIOException ex) {
failed = true;
assertFalse(translog.isOpen());
} catch (IOException ex) {
failed = true;
assertFalse(translog.isOpen());
assertEquals("__FAKE__ no space left on device", ex.getMessage());
}
simulateDiskFull.set(randomBoolean());
fail.set(randomBoolean());
}
simulateDiskFull.set(false);
fail.set(false);
if (randomBoolean()) {
try {
locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
fail("we are already closed");
} catch (AlreadyClosedException ex) {
assertNotNull(ex.getCause());
assertEquals(ex.getCause().getMessage(), "__FAKE__ no space left on device");
if (ex.getCause() instanceof MockDirectoryWrapper.FakeIOException) {
assertNull(ex.getCause().getMessage());
} else {
assertEquals(ex.getCause().getMessage(), "__FAKE__ no space left on device");
}
}

}
Expand Down Expand Up @@ -1592,4 +1575,157 @@ public void testTranslogOpsCountIsCorrect() throws IOException {
}
}
}

public void testFatalIOExceptionsWhileWritingConcurrently() throws IOException, InterruptedException {
Path tempDir = createTempDir();
final AtomicBoolean fail = new AtomicBoolean(false);

TranslogConfig config = getTranslogConfig(tempDir);
final Translog translog = getFailableTranslog(fail, config);

final int threadCount = randomIntBetween(1, 5);
Thread[] threads = new Thread[threadCount];
final Throwable[] threadExceptions = new Throwable[threadCount];
final CountDownLatch downLatch = new CountDownLatch(1);
final CountDownLatch added = new CountDownLatch(randomIntBetween(10, 100));
List<LocationOperation> writtenOperations = Collections.synchronizedList(new ArrayList<LocationOperation>());
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
threads[i] = new TranslogThread(translog, downLatch, 200, threadId, writtenOperations, threadExceptions) {
@Override
protected Translog.Location add(Translog.Operation op) throws IOException {
Translog.Location add = super.add(op);
added.countDown();
return add;
}

@Override
protected void afterAdd() throws IOException {
if (randomBoolean()) {
translog.sync();
}
}
};
threads[i].setDaemon(true);
threads[i].start();
}
downLatch.countDown();
added.await();
try (Translog.View view = translog.newView()) {
// this holds a reference to the current tlog channel such that it's not closed
// if we hit a tragic event. this is important to ensure that asserts inside the Translog#add doesn't trip
// otherwise our assertions here are off by one sometimes.
fail.set(true);
for (int i = 0; i < threadCount; i++) {
threads[i].join();
}
boolean atLeastOneFailed = false;
for (Throwable ex : threadExceptions) {
if (ex != null) {
atLeastOneFailed = true;
break;
}
}
if (atLeastOneFailed == false) {
try {
boolean syncNeeded = translog.syncNeeded();
translog.close();
assertFalse("should have failed if sync was needed", syncNeeded);
} catch (IOException ex) {
// boom now we failed
}
}
Collections.sort(writtenOperations, new Comparator<LocationOperation>() {
@Override
public int compare(LocationOperation a, LocationOperation b) {
return a.location.compareTo(b.location);
}
});
assertFalse(translog.isOpen());
final Checkpoint checkpoint = Checkpoint.read(config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME));
Iterator<LocationOperation> iterator = writtenOperations.iterator();
while (iterator.hasNext()) {
LocationOperation next = iterator.next();
if (checkpoint.offset < (next.location.translogLocation + next.location.size)) {
// drop all that haven't been synced
iterator.remove();
}
}
config.setTranslogGeneration(translog.getGeneration());
try (Translog tlog = new Translog(config)) {
try (Translog.Snapshot snapshot = tlog.newSnapshot()) {
if (writtenOperations.size() != snapshot.estimatedTotalOperations()) {
for (int i = 0; i < threadCount; i++) {
if (threadExceptions[i] != null)
threadExceptions[i].printStackTrace();
}
}
assertEquals(writtenOperations.size(), snapshot.estimatedTotalOperations());
for (int i = 0; i < writtenOperations.size(); i++) {
assertEquals("expected operation" + i + " to be in the previous translog but wasn't", tlog.currentFileGeneration() - 1, writtenOperations.get(i).location.generation);
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next);
assertEquals(next, writtenOperations.get(i).operation);
}
}
}
}
}

private Translog getFailableTranslog(final AtomicBoolean fail, final TranslogConfig config) throws IOException {
return new Translog(config) {
@Override
TranslogWriter.ChannelFactory getChannelFactory() {
final TranslogWriter.ChannelFactory factory = super.getChannelFactory();

return new TranslogWriter.ChannelFactory() {
@Override
public FileChannel open(Path file) throws IOException {
FileChannel channel = factory.open(file);
return new ThrowingFileChannel(fail, randomBoolean(), channel);
}
};
}
};
}

public static class ThrowingFileChannel extends FilterFileChannel {
private final AtomicBoolean fail;
private final boolean partialWrite;

public ThrowingFileChannel(AtomicBoolean fail, boolean partialWrite, FileChannel delegate) {
super(delegate);
this.fail = fail;
this.partialWrite = partialWrite;
}

@Override
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public int write(ByteBuffer src, long position) throws IOException {
throw new UnsupportedOperationException();
}


public int write(ByteBuffer src) throws IOException {
if (fail.get()) {
if (partialWrite) {
if (src.limit() > 1) {
final int pos = src.position();
final int limit = src.limit();
src.limit(limit / 2);
super.write(src);
src.position(pos);
src.limit(limit);
throw new IOException("__FAKE__ no space left on device");
}
}
throw new MockDirectoryWrapper.FakeIOException();
}
return super.write(src);
}
}
}

0 comments on commit 6abf5d8

Please sign in to comment.