Skip to content

Commit

Permalink
always require upload of an object if it is not possible to get its tags
Browse files Browse the repository at this point in the history
  • Loading branch information
smiklosovic committed Nov 21, 2023
1 parent eff9951 commit 8cdb00f
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 29 deletions.
58 changes: 32 additions & 26 deletions src/main/java/com/instaclustr/esop/impl/backup/UploadTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

Expand Down Expand Up @@ -116,36 +117,41 @@ public Void call() {

final RemoteObjectReference ref = getRemoteObjectReference(manifestEntry.objectKey);

if (manifestEntry.type != MANIFEST_FILE) {
if (getRetrier(backuper.request.retry).submit(() -> backuper.freshenRemoteObject(manifestEntry, ref) == FRESHENED)) {
logger.info(format("%sskipping the upload of already uploaded file %s",
snapshotTag != null ? "Snapshot " + snapshotTag + " - " : "",
ref.canonicalPath));

state = State.FINISHED;
return null;
// try to refresh object / decide if it is required to upload it
Callable<Boolean> condition = () -> {
try {
return backuper.freshenRemoteObject(manifestEntry, ref) == FRESHENED;
} catch (final Exception ex) {
throw new RetriableException("Failed to refresh remote object" + ref.objectKey, ex);
}
};

if (manifestEntry.type != MANIFEST_FILE && getRetrier(backuper.request.retry).submit(condition)) {
logger.info("{}skipping the upload of alredy uploaded file {}",
snapshotTag != null ? "Snapshot " + snapshotTag + " - " : "",
ref.canonicalPath);

state = State.FINISHED;
return null;
}

getRetrier(backuper.request.retry).submit(new Runnable() {
@Override
public void run() {
try (final InputStream fileStream = new BufferedInputStream(new FileInputStream(manifestEntry.localFile.toFile()))) {
final InputStream rateLimitedStream = getUploadingInputStreamFunction(backuper.request).apply(fileStream);

logger.debug(format("%suploading file '%s' (%s).",
snapshotTag != null ? "Snapshot " + snapshotTag + " - " : "",
manifestEntry.objectKey,
DataSize.bytesToHumanReadable(manifestEntry.size)));
// never encrypt manifest
if (manifestEntry.type == MANIFEST_FILE) {
backuper.uploadFile(manifestEntry, rateLimitedStream, ref);
} else {
backuper.uploadEncryptedFile(manifestEntry, rateLimitedStream, ref);
}
} catch (final Exception ex) {
throw new RetriableException(String.format("Retrying upload of %s", manifestEntry.objectKey), ex);
// do the upload
getRetrier(backuper.request.retry).submit(() -> {
try (final InputStream fileStream = new BufferedInputStream(new FileInputStream(manifestEntry.localFile.toFile()))) {
final InputStream rateLimitedStream = getUploadingInputStreamFunction(backuper.request).apply(fileStream);

logger.debug(format("%suploading file '%s' (%s).",
snapshotTag != null ? "Snapshot " + snapshotTag + " - " : "",
manifestEntry.objectKey,
DataSize.bytesToHumanReadable(manifestEntry.size)));
// never encrypt manifest
if (manifestEntry.type == MANIFEST_FILE) {
backuper.uploadFile(manifestEntry, rateLimitedStream, ref);
} else {
backuper.uploadEncryptedFile(manifestEntry, rateLimitedStream, ref);
}
} catch (final Exception ex) {
throw new RetriableException(String.format("Retrying upload of %s", manifestEntry.objectKey), ex);
}
});

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/instaclustr/esop/impl/retry/Retrier.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public <T> T submit(final Callable<T> c) throws Exception {
if (attempts > maxAttempts) {
throw ex;
}
logger.error("This operation will be retried: " + ex.getMessage(), ex);
logger.error("This operation will be retried: " + ex.getMessage());
sleeper.sleep();
} else {
throw ex;
Expand All @@ -70,7 +70,7 @@ public void submit(final Runnable r) {
if (attempts > maxAttempts) {
throw ex;
}
logger.error("This operation will be retried: " + ex.getMessage(), ex);
logger.error("This operation will be retried: " + ex.getMessage());
sleeper.sleep();
} else {
throw ex;
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/instaclustr/esop/s3/v2/BaseS3Backuper.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import software.amazon.awssdk.services.s3.model.NoSuchUploadException;
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.SdkPartType;
import software.amazon.awssdk.services.s3.model.StorageClass;
import software.amazon.awssdk.services.s3.model.Tag;
Expand Down Expand Up @@ -107,7 +108,7 @@ public FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, RemoteObje
.key(object.canonicalPath)
.build()).tagSet();
}
catch (NoSuchKeyException ex) {
catch (S3Exception ex) {
return FreshenResult.UPLOAD_REQUIRED;
}

Expand Down

0 comments on commit 8cdb00f

Please sign in to comment.