From cfec1de719fb710c4c633259e47cf785808f8828 Mon Sep 17 00:00:00 2001 From: Alexander Leyke Date: Fri, 28 Feb 2020 17:46:42 -0500 Subject: [PATCH 1/4] ULTI-422698 - Alex - configurable buffer size, enable import of md5 utils --- build.gradle | 2 +- .../swift/repositories/blobstore/SwiftBlobContainer.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 3d57fde..986a807 100644 --- a/build.gradle +++ b/build.gradle @@ -67,11 +67,11 @@ dependencies { compile group: 'com.google.guava', name: 'guava', version: guavaVersion compile group: 'org.javaswift', name: 'joss', version: jossVersion compile group: 'org.apache.httpcomponents', name: 'httpclient', version: httpclientVersion + compile group: 'commons-codec', name: 'commons-codec', version: commonsCodecVersion runtime group: 'org.apache.httpcomponents', name: 'httpcore', version: httpcoreVersion runtime group: 'javax.activation', name: 'activation', version: javaxActivationVersion runtime group: 'org.slf4j', name: 'slf4j-api', version: slf4jVersion runtime group: 'org.slf4j', name: 'slf4j-log4j12', version: slf4jVersion - runtime group: 'commons-codec', name: 'commons-codec', version: commonsCodecVersion runtime group: 'commons-logging', name: 'commons-logging', version: commonsloggingVersion runtime group: 'org.codehaus.jackson', name: 'jackson-mapper-asl', version: jacksonmapperVersion runtime group: 'commons-lang', name: 'commons-lang', version: commonslangVersion diff --git a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java index ca404a8..75cc996 100644 --- a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java +++ b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java @@ -351,7 +351,7 @@ public void writeBlob(final String blobName, } private byte[] readAllBytes(InputStream in) throws IOException { - final byte[] buffer = new byte[1024]; + final byte[] buffer = new byte[(int) blobStore.getBufferSizeInBytes()]; ByteArrayOutputStream baos = new ByteArrayOutputStream(buffer.length); int read; From b7aea8586f3ef65f04e58ca334277e7339b00a86 Mon Sep 17 00:00:00 2001 From: Alexander Leyke Date: Mon, 2 Mar 2020 10:18:08 -0500 Subject: [PATCH 2/4] VerifyEtagAndRetry - Alex - intermediate commit --- .../blobstore/SwiftBlobContainer.java | 40 ++++++++++++++++--- 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java index 75cc996..26b1ec2 100644 --- a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java +++ b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java @@ -16,6 +16,8 @@ package org.wikimedia.elasticsearch.swift.repositories.blobstore; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.Nullable; @@ -37,13 +39,11 @@ import org.wikimedia.elasticsearch.swift.util.retry.WithTimeout; import org.wikimedia.elasticsearch.swift.repositories.SwiftRepository; -import java.io.BufferedInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; +import java.io.*; import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; +import java.security.MessageDigest; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -350,18 +350,46 @@ public void writeBlob(final String blobName, internalWriteBlob(blobName, bytes, failIfAlreadyExists); } - private byte[] readAllBytes(InputStream in) throws IOException { + private byte[] readAllBytes(InputStream is) throws IOException { final byte[] buffer = new byte[(int) blobStore.getBufferSizeInBytes()]; ByteArrayOutputStream baos = new ByteArrayOutputStream(buffer.length); int read; - while ((read = in.read(buffer)) != -1) { + while ((read = is.read(buffer)) != -1) { baos.write(buffer, 0, read); } return baos.toByteArray(); } + static class InputStreamEtagResult + { + final InputStream is; + final String etag; + + InputStreamEtagResult(InputStream is, String etag) { + this.is = is; + this.etag = etag; + } + } + + private InputStreamEtagResult calculateEtagFromInputStream(InputStream is) throws IOException { + final byte[] buffer = new byte[(int) blobStore.getBufferSizeInBytes()]; + ByteArrayOutputStream baos = new ByteArrayOutputStream(buffer.length); + int read; + MessageDigest md5 = DigestUtils.getMd5Digest(); + + while ((read = is.read(buffer)) != -1) { + md5.update(buffer, 0, read); + baos.write(buffer, 0, read); + } + + String etag = Hex.encodeHexString(md5.digest()); + InputStream is2 = new ByteArrayInputStream(baos.toByteArray()); + + return new InputStreamEtagResult(null, etag); + } + private void internalWriteBlob(String blobName, byte[] bytes, boolean failIfAlreadyExists) throws IOException { try { IOException exception = withTimeout().retry(retryIntervalS, shortOperationTimeoutS, TimeUnit.SECONDS, () -> { From f6eb45f357302abe71fb4d7fb9e24cf9d019bb55 Mon Sep 17 00:00:00 2001 From: Alexander Leyke Date: Mon, 2 Mar 2020 14:59:26 -0500 Subject: [PATCH 3/4] VerifyEtagAndRetry - Alex - optimized etag verification --- .../blobstore/SwiftBlobContainer.java | 80 +++++++++---------- 1 file changed, 36 insertions(+), 44 deletions(-) diff --git a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java index 26b1ec2..f05eeff 100644 --- a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java +++ b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java @@ -16,7 +16,6 @@ package org.wikimedia.elasticsearch.swift.repositories.blobstore; -import org.apache.commons.codec.binary.Hex; import org.apache.commons.codec.digest.DigestUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -39,11 +38,13 @@ import org.wikimedia.elasticsearch.swift.util.retry.WithTimeout; import org.wikimedia.elasticsearch.swift.repositories.SwiftRepository; -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.IOException; import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; -import java.security.MessageDigest; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -305,16 +306,31 @@ private String buildKey(String blobName) { */ @Override public InputStream readBlob(final String blobName) throws IOException { + String objectName = buildKey(blobName); + try { return withTimeout().retry(retryIntervalS, shortOperationTimeoutS, TimeUnit.SECONDS, () -> { try { - InputStream downloadStream = SwiftPerms.execThrows(() -> - blobStore.getContainer().getObject(buildKey(blobName)).downloadObjectAsInputStream() - ); - return new BufferedInputStream(downloadStream, (int) blobStore.getBufferSizeInBytes()); + return SwiftPerms.execThrows(() -> { + StoredObject storedObject = blobStore.getContainer().getObject(objectName); + InputStream rawInputStream = storedObject.downloadObjectAsInputStream(); + int contentLength = (int) storedObject.getContentLength(); + String objectEtag = storedObject.getEtag(); + byte[] objectData = readAllBytes(rawInputStream, contentLength); + String dataEtag = DigestUtils.md5Hex(objectData); + + if (!dataEtag.equals(objectEtag)) { + String message = "cannot read blob [" + objectName + "]: server etag [" + objectEtag + + "] does not match calculated etag [" + dataEtag + "]"; + logger.warn(message); + throw new IOException(message); + } + + return new ByteArrayInputStream(objectData); + }); } catch (NotFoundException e) { - String message = "cannot read blob [" + buildKey(blobName) + "]"; + String message = "cannot read blob [" + objectName + "]"; logger.warn(message); NoSuchFileException e2 = new NoSuchFileException(message); e2.initCause(e); @@ -326,7 +342,7 @@ public InputStream readBlob(final String blobName) throws IOException { throw e; } catch(Exception e) { - throw new BlobStoreException("cannot read blob [" + buildKey(blobName) + "]", e); + throw new BlobStoreException("cannot read blob [" + objectName + "]", e); } } @@ -335,7 +351,8 @@ public void writeBlob(final String blobName, final InputStream in, final long blobSize, boolean failIfAlreadyExists) throws IOException { - byte[] bytes = readAllBytes(in); + // async execution races against the InputStream closed in the caller. Read all data locally. + byte[] bytes = readAllBytes(in, -1); if (executor != null && allowConcurrentIO) { Future task = executor.submit(() -> { @@ -350,44 +367,19 @@ public void writeBlob(final String blobName, internalWriteBlob(blobName, bytes, failIfAlreadyExists); } - private byte[] readAllBytes(InputStream is) throws IOException { - final byte[] buffer = new byte[(int) blobStore.getBufferSizeInBytes()]; - ByteArrayOutputStream baos = new ByteArrayOutputStream(buffer.length); - int read; - - while ((read = is.read(buffer)) != -1) { - baos.write(buffer, 0, read); - } + private byte[] readAllBytes(InputStream in, int sizeHint) throws IOException { + int bufferSize = (int) blobStore.getBufferSizeInBytes(); + final byte[] buffer = new byte[bufferSize]; - return baos.toByteArray(); - } + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(sizeHint > 0 ? sizeHint : bufferSize)) { + int read; - static class InputStreamEtagResult - { - final InputStream is; - final String etag; - - InputStreamEtagResult(InputStream is, String etag) { - this.is = is; - this.etag = etag; - } - } - - private InputStreamEtagResult calculateEtagFromInputStream(InputStream is) throws IOException { - final byte[] buffer = new byte[(int) blobStore.getBufferSizeInBytes()]; - ByteArrayOutputStream baos = new ByteArrayOutputStream(buffer.length); - int read; - MessageDigest md5 = DigestUtils.getMd5Digest(); + while ((read = in.read(buffer)) != -1) { + baos.write(buffer, 0, read); + } - while ((read = is.read(buffer)) != -1) { - md5.update(buffer, 0, read); - baos.write(buffer, 0, read); + return baos.toByteArray(); } - - String etag = Hex.encodeHexString(md5.digest()); - InputStream is2 = new ByteArrayInputStream(baos.toByteArray()); - - return new InputStreamEtagResult(null, etag); } private void internalWriteBlob(String blobName, byte[] bytes, boolean failIfAlreadyExists) throws IOException { From 483dd6906c49b74a92e6bb0fd379140b4d33ee00 Mon Sep 17 00:00:00 2001 From: Alexander Leyke Date: Mon, 9 Mar 2020 10:08:44 -0400 Subject: [PATCH 4/4] VerifyEtagAndRetry - Alex - corrected comment --- .../swift/repositories/blobstore/SwiftBlobContainer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java index f05eeff..a147235 100644 --- a/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java +++ b/src/main/java/org/wikimedia/elasticsearch/swift/repositories/blobstore/SwiftBlobContainer.java @@ -300,7 +300,7 @@ private String buildKey(String blobName) { } /** - * Fetch a given blob into a BufferedInputStream + * Fetch a given blob into memory, verify etag, and return InputStream. * @param blobName The blob name to read * @return a stream */