Skip to content

Commit

Permalink
Merge pull request #3 from UltimateSoftware/VerifyEtagAndRetry
Browse files Browse the repository at this point in the history
Verify etag and retry
  • Loading branch information
alexleyke authored Mar 9, 2020
2 parents fcdba6a + 483dd69 commit ffb0537
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 19 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ dependencies {
compile group: 'com.google.guava', name: 'guava', version: guavaVersion
compile group: 'org.javaswift', name: 'joss', version: jossVersion
compile group: 'org.apache.httpcomponents', name: 'httpclient', version: httpclientVersion
compile group: 'commons-codec', name: 'commons-codec', version: commonsCodecVersion
runtime group: 'org.apache.httpcomponents', name: 'httpcore', version: httpcoreVersion
runtime group: 'javax.activation', name: 'activation', version: javaxActivationVersion
runtime group: 'org.slf4j', name: 'slf4j-api', version: slf4jVersion
runtime group: 'org.slf4j', name: 'slf4j-log4j12', version: slf4jVersion
runtime group: 'commons-codec', name: 'commons-codec', version: commonsCodecVersion
runtime group: 'commons-logging', name: 'commons-logging', version: commonsloggingVersion
runtime group: 'org.codehaus.jackson', name: 'jackson-mapper-asl', version: jacksonmapperVersion
runtime group: 'commons-lang', name: 'commons-lang', version: commonslangVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.wikimedia.elasticsearch.swift.repositories.blobstore;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Nullable;
Expand All @@ -37,10 +38,10 @@
import org.wikimedia.elasticsearch.swift.util.retry.WithTimeout;
import org.wikimedia.elasticsearch.swift.repositories.SwiftRepository;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;

import java.nio.file.NoSuchFileException;
Expand Down Expand Up @@ -299,22 +300,37 @@ private String buildKey(String blobName) {
}

/**
* Fetch a given blob into a BufferedInputStream
* Fetch a given blob into memory, verify etag, and return InputStream.
* @param blobName The blob name to read
* @return a stream
*/
@Override
public InputStream readBlob(final String blobName) throws IOException {
String objectName = buildKey(blobName);

try {
return withTimeout().retry(retryIntervalS, shortOperationTimeoutS, TimeUnit.SECONDS, () -> {
try {
InputStream downloadStream = SwiftPerms.execThrows(() ->
blobStore.getContainer().getObject(buildKey(blobName)).downloadObjectAsInputStream()
);
return new BufferedInputStream(downloadStream, (int) blobStore.getBufferSizeInBytes());
return SwiftPerms.execThrows(() -> {
StoredObject storedObject = blobStore.getContainer().getObject(objectName);
InputStream rawInputStream = storedObject.downloadObjectAsInputStream();
int contentLength = (int) storedObject.getContentLength();
String objectEtag = storedObject.getEtag();
byte[] objectData = readAllBytes(rawInputStream, contentLength);
String dataEtag = DigestUtils.md5Hex(objectData);

if (!dataEtag.equals(objectEtag)) {
String message = "cannot read blob [" + objectName + "]: server etag [" + objectEtag +
"] does not match calculated etag [" + dataEtag + "]";
logger.warn(message);
throw new IOException(message);
}

return new ByteArrayInputStream(objectData);
});
}
catch (NotFoundException e) {
String message = "cannot read blob [" + buildKey(blobName) + "]";
String message = "cannot read blob [" + objectName + "]";
logger.warn(message);
NoSuchFileException e2 = new NoSuchFileException(message);
e2.initCause(e);
Expand All @@ -326,7 +342,7 @@ public InputStream readBlob(final String blobName) throws IOException {
throw e;
}
catch(Exception e) {
throw new BlobStoreException("cannot read blob [" + buildKey(blobName) + "]", e);
throw new BlobStoreException("cannot read blob [" + objectName + "]", e);
}
}

Expand All @@ -335,7 +351,8 @@ public void writeBlob(final String blobName,
final InputStream in,
final long blobSize,
boolean failIfAlreadyExists) throws IOException {
byte[] bytes = readAllBytes(in);
// async execution races against the InputStream closed in the caller. Read all data locally.
byte[] bytes = readAllBytes(in, -1);

if (executor != null && allowConcurrentIO) {
Future<Void> task = executor.submit(() -> {
Expand All @@ -350,16 +367,19 @@ public void writeBlob(final String blobName,
internalWriteBlob(blobName, bytes, failIfAlreadyExists);
}

private byte[] readAllBytes(InputStream in) throws IOException {
final byte[] buffer = new byte[1024];
ByteArrayOutputStream baos = new ByteArrayOutputStream(buffer.length);
int read;
private byte[] readAllBytes(InputStream in, int sizeHint) throws IOException {
int bufferSize = (int) blobStore.getBufferSizeInBytes();
final byte[] buffer = new byte[bufferSize];

while ((read = in.read(buffer)) != -1) {
baos.write(buffer, 0, read);
}
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(sizeHint > 0 ? sizeHint : bufferSize)) {
int read;

return baos.toByteArray();
while ((read = in.read(buffer)) != -1) {
baos.write(buffer, 0, read);
}

return baos.toByteArray();
}
}

private void internalWriteBlob(String blobName, byte[] bytes, boolean failIfAlreadyExists) throws IOException {
Expand Down

0 comments on commit ffb0537

Please sign in to comment.