Skip to content

Commit

Permalink
WIP uploader failure handling improvements
Browse files Browse the repository at this point in the history
Update test_topic_b generated segments to conform to github filesize limit
  • Loading branch information
jeffxiang committed Dec 13, 2024
1 parent 6ec5032 commit 2975531
Show file tree
Hide file tree
Showing 26 changed files with 336 additions and 67 deletions.
2 changes: 1 addition & 1 deletion ts-segment-uploader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven.surefire.version}</version>
<version>3.0.0-M5</version>
<configuration>
<threadCount>1</threadCount>
<reuseForks>false</reuseForks>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -51,6 +56,7 @@
*/
public class DirectoryTreeWatcher implements Runnable {
private static final Logger LOG = LogManager.getLogger(DirectoryTreeWatcher.class);
private static final Logger FAILED_UPLOADS_LOG = LogManager.getLogger("failed-uploads");
private static final String[] MONITORED_EXTENSIONS = {".timeindex", ".index", ".log"};
private static final Pattern MONITORED_FILE_PATTERN = Pattern.compile("^\\d+(" + String.join("|", MONITORED_EXTENSIONS) + ")$");
private static Map<TopicPartition, String> activeSegment;
Expand All @@ -75,6 +81,7 @@ public class DirectoryTreeWatcher implements Runnable {
private final Object watchKeyMapLock = new Object();
private Thread thread;
private boolean cancelled = false;
private final Object failedUploadFileLock = new Object();

public static void setLeadershipWatcher(LeadershipWatcher suppliedLeadershipWatcher) {
if (leadershipWatcher == null)
Expand Down Expand Up @@ -142,7 +149,8 @@ public void initialize() throws Exception {
LOG.info("Submitting s3UploadHandler loop");
}

private void handleUploadCallback(UploadTask uploadTask, long totalTimeMs, Throwable throwable, int statusCode) {
@VisibleForTesting
protected void handleUploadCallback(UploadTask uploadTask, long totalTimeMs, Throwable throwable, int statusCode) {
TopicPartition topicPartition = uploadTask.getTopicPartition();
MetricRegistryManager.getInstance(config.getMetricsConfiguration()).incrementCounter(
topicPartition.topic(),
Expand Down Expand Up @@ -315,24 +323,65 @@ private void handleUploadException(UploadTask uploadTask, Throwable throwable, T
"broker=" + environmentProvider.brokerId(),
"file=" + uploadTask.getFullFilename()
);
handleFailedUploadAfterAllRetries(uploadTask, throwable, topicPartition);
}
} else if (uploadTask.getTries() < config.getUploadMaxRetries()){
// retry all other errors
retryUpload(uploadTask.retry(), throwable, topicPartition);
} else {
// retry limit reached, upload is still erroring - send a metric
MetricRegistryManager.getInstance(config.getMetricsConfiguration()).incrementCounter(
topicPartition.topic(),
topicPartition.partition(),
UploaderMetrics.UPLOAD_ERROR_METRIC,
"exception=" + throwable.getClass().getName(),
"cluster=" + environmentProvider.clusterId(),
"broker=" + environmentProvider.brokerId(),
"offset=" + uploadTask.getOffset()
);
// retry limit reached, upload still errors
handleFailedUploadAfterAllRetries(uploadTask, throwable, topicPartition);
}
}

/**
* Handle a failed upload after all retries have been exhausted.
*
* @param uploadTask the upload task that failed
* @param throwable the exception that caused the failure
* @param topicPartition the topic partition of the upload task
*/
private void handleFailedUploadAfterAllRetries(UploadTask uploadTask, Throwable throwable, TopicPartition topicPartition) {
LOG.error(String.format("Failed to upload file %s to %s after reaching max %s retries.",
uploadTask.getAbsolutePath(), uploadTask.getUploadDestinationPathString(), uploadTask.getTries()));
if (config.getUploadFailureFile() != null) {
synchronized (failedUploadFileLock) {
LOG.info(String.format("Writing failed upload %s --> %s to failure file: %s",
uploadTask.getAbsolutePath(), uploadTask.getUploadDestinationPathString(), config.getUploadFailureFile()));
try {
long timestamp = System.currentTimeMillis();
LocalDateTime dt = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault());
Files.write(
Paths.get(config.getUploadFailureFile()),
Arrays.asList(
"timestamp: " + timestamp,
"human_timestamp: " + dt.format(DateTimeFormatter.ISO_DATE_TIME),
"task_num_retries: " + uploadTask.getTries(),
"local_path: " + uploadTask.getAbsolutePath(),
"destination_path: " + uploadTask.getUploadDestinationPathString(),
"exception: " + throwable.getClass().getName(),
"message: " + throwable.getMessage(),
"-------------------"
),
StandardOpenOption.CREATE,
StandardOpenOption.APPEND
);
} catch (IOException e) {
LOG.error("Failed to write failed upload to failure file", e);
}
}
}
MetricRegistryManager.getInstance(config.getMetricsConfiguration()).incrementCounter(
topicPartition.topic(),
topicPartition.partition(),
UploaderMetrics.UPLOAD_ERROR_METRIC,
"exception=" + throwable.getClass().getName(),
"cluster=" + environmentProvider.clusterId(),
"broker=" + environmentProvider.brokerId(),
"offset=" + uploadTask.getOffset()
);
}

/**
* Retry the upload of a file that failed to upload.
* @param uploadTask
Expand Down Expand Up @@ -952,6 +1001,7 @@ public static class UploadTask {
private final long sizeBytes;
private int tries = 0;
private long nextRetryNotBeforeTimestamp = -1;
private String uploadDestinationPathString;

public UploadTask(TopicPartition topicPartition, String offset, String fullFilename, Path absolutePath) {
this.topicPartition = topicPartition;
Expand Down Expand Up @@ -1012,6 +1062,14 @@ public long getNextRetryNotBeforeTimestamp() {
return nextRetryNotBeforeTimestamp;
}

public String getUploadDestinationPathString() {
return uploadDestinationPathString;
}

public void setUploadDestinationPathString(String uploadDestinationPathString) {
this.uploadDestinationPathString = uploadDestinationPathString;
}

public boolean isReadyForUpload() {
return tries == 0 || System.currentTimeMillis() > nextRetryNotBeforeTimestamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@
import com.pinterest.kafka.tieredstorage.common.discovery.s3.S3StorageServiceEndpoint;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.exception.ApiCallTimeoutException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;

import java.nio.file.NoSuchFileException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
Expand All @@ -22,24 +24,27 @@
*/
public class MultiThreadedS3FileUploader implements S3FileUploader {
private static final Logger LOG = LogManager.getLogger(MultiThreadedS3FileUploader.class);
private static final int UPLOAD_TIMEOUT_ERROR_CODE = 601;
private static final int UPLOAD_FILE_NOT_FOUND_ERROR_CODE = 602;
private static final int UPLOAD_GENERAL_ERROR_CODE = 603;
protected static final int UPLOAD_TIMEOUT_ERROR_CODE = 601;
protected static final int UPLOAD_FILE_NOT_FOUND_ERROR_CODE = 602;
protected static final int UPLOAD_GENERAL_ERROR_CODE = 603;
private final ExecutorService executorService;
private final StorageServiceEndpointProvider endpointProvider;
private final Heartbeat heartbeat;
private static S3Client s3Client;
private static S3AsyncClient s3AsyncClient;
private final SegmentUploaderConfiguration config;

public MultiThreadedS3FileUploader(StorageServiceEndpointProvider endpointProvider, SegmentUploaderConfiguration config, KafkaEnvironmentProvider environmentProvider) {
this.endpointProvider = endpointProvider;
this.config = config;
if (s3Client == null) {
s3Client = S3Client.builder().build();
ClientOverrideConfiguration overrideConfiguration = ClientOverrideConfiguration.builder()
.apiCallTimeout(Duration.ofMillis(config.getUploadTimeoutMs()))
.build();
if (s3AsyncClient == null) {
s3AsyncClient = S3AsyncClient.builder().overrideConfiguration(overrideConfiguration).build();
}
executorService = Executors.newFixedThreadPool(config.getUploadThreadCount());
heartbeat = new Heartbeat("uploader", config, environmentProvider);
LOG.info("Started MultiThreadedS3FileUploader with threadpool size=" + config.getUploadThreadCount());
LOG.info("Started MultiThreadedS3FileUploader with threadpool size=" + config.getUploadThreadCount() + " and timeout=" + config.getUploadTimeoutMs() + "ms");
}

@Override
Expand All @@ -58,25 +63,35 @@ public void uploadFile(DirectoryTreeWatcher.UploadTask uploadTask, S3UploadCallb

String s3Key = String.format("%s/%s", s3Prefix, subpath);
long queueTime = System.currentTimeMillis();
CompletableFuture<PutObjectResponse> future =
CompletableFuture.supplyAsync(() -> {
PutObjectRequest putObjectRequest = PutObjectRequest.builder()
.bucket(s3Bucket)
.key(s3Key)
// Changing checksum algorithm does not seem to
// have any impact regarding seeing CPU intensive
// sun/security/provider/MD5.implCompress
// that is observed in the flame graph.
//.checksumAlgorithm(ChecksumAlgorithm.CRC32_C)
.build();
return s3Client.putObject(putObjectRequest, uploadTask.getAbsolutePath());
}, executorService).orTimeout(config.getUploadTimeoutMs(), TimeUnit.MILLISECONDS);

LOG.info(String.format("Submitted upload of s3://%s/%s", s3Bucket, s3Key));
PutObjectRequest putObjectRequest = PutObjectRequest.builder()
.bucket(s3Bucket)
.key(s3Key)
// Changing checksum algorithm does not seem to
// have any impact regarding seeing CPU intensive
// sun/security/provider/MD5.implCompress
// that is observed in the flame graph.
//.checksumAlgorithm(ChecksumAlgorithm.CRC32_C)
.build();
CompletableFuture<PutObjectResponse> future;
String uploadPathString = String.format("s3://%s/%s", s3Bucket, s3Key);
uploadTask.setUploadDestinationPathString(uploadPathString); // set the upload destination path so that it can be used in the callback
try {
LOG.info(String.format("Submitting upload of %s --> %s", uploadTask.getAbsolutePath(), uploadPathString));
future = s3AsyncClient.putObject(putObjectRequest, uploadTask.getAbsolutePath());
} catch (Exception e) {
long timeSpentMs = System.currentTimeMillis() - queueTime;
LOG.error(String.format("Caught exception during putObject for %s --> %s in %dms", uploadTask.getAbsolutePath(), uploadPathString, timeSpentMs), e);
int errorCode = UPLOAD_GENERAL_ERROR_CODE;
if (Utils.isAssignableFromRecursive(e, NoSuchFileException.class)) {
errorCode = UPLOAD_FILE_NOT_FOUND_ERROR_CODE;
}
s3UploadCallback.onCompletion(uploadTask, timeSpentMs, e, errorCode);
return;
}
future.whenComplete((putObjectResponse, throwable) -> {
long timeSpentMs = System.currentTimeMillis() - queueTime;
if (throwable != null) {
LOG.error(String.format("Failed upload of s3://%s/%s in %d ms.", s3Bucket, s3Key, timeSpentMs), throwable);
LOG.error(String.format("PutObject failed for %s --> %s in %d ms.", uploadTask.getAbsolutePath(), uploadPathString, timeSpentMs), throwable);

int errorCode = getErrorCode(throwable, putObjectResponse);

Expand All @@ -87,7 +102,7 @@ public void uploadFile(DirectoryTreeWatcher.UploadTask uploadTask, S3UploadCallb
errorCode
);
} else {
LOG.info(String.format("Completed upload of s3://%s/%s in %d ms.", s3Bucket, s3Key, timeSpentMs));
LOG.info(String.format("Completed upload of %s in %d ms.", uploadPathString, timeSpentMs));
s3UploadCallback.onCompletion(uploadTask, timeSpentMs,null, putObjectResponse.sdkHttpResponse().statusCode());
}
});
Expand All @@ -97,7 +112,7 @@ private int getErrorCode(Throwable throwable, PutObjectResponse putObjectRespons
if (throwable == null) {
return putObjectResponse == null ? UPLOAD_GENERAL_ERROR_CODE : putObjectResponse.sdkHttpResponse().statusCode();
}
if (throwable instanceof TimeoutException) {
if (throwable instanceof ApiCallTimeoutException || throwable instanceof TimeoutException) {
return UPLOAD_TIMEOUT_ERROR_CODE;
}
if (throwable instanceof NoSuchFileException) {
Expand All @@ -108,7 +123,7 @@ private int getErrorCode(Throwable throwable, PutObjectResponse putObjectRespons
}

public void stop() {
s3Client.close();
s3AsyncClient.close();
executorService.shutdown();
heartbeat.stop();
}
Expand All @@ -119,7 +134,7 @@ public StorageServiceEndpointProvider getStorageServiceEndpointProvider() {
}

@VisibleForTesting
protected static void overrideS3Client(S3Client newS3Client) {
s3Client = newS3Client;
protected static void overrideS3Client(S3AsyncClient newS3Client) {
s3AsyncClient = newS3Client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ public class SegmentUploaderConfiguration {
*/
private static final String UPLOAD_MAX_RETRIES = TS_SEGMENT_UPLOADER_PREFIX + "." + "upload.max.retries";

/**
* File to write failed uploads information to.
*/
private static final String UPLOAD_FAILURE_FILE = TS_SEGMENT_UPLOADER_PREFIX + "." + "upload.failure.file";

/**
* Class name for {@link com.pinterest.kafka.tieredstorage.uploader.leadership.LeadershipWatcher} implementation to use.
*/
Expand Down Expand Up @@ -210,6 +215,15 @@ public int getUploadMaxRetries() {
return Integer.parseInt(properties.getProperty(UPLOAD_MAX_RETRIES, String.valueOf(Defaults.DEFAULT_UPLOAD_MAX_RETRIES)));
}

public String getUploadFailureFile() {
return properties.getProperty(UPLOAD_FAILURE_FILE);
}

@VisibleForTesting
protected void setUploadFailureFile(String uploadFailureFile) {
properties.setProperty(UPLOAD_FAILURE_FILE, uploadFailureFile);
}

@VisibleForTesting
protected boolean isInInclusionCache(String topicName) {
return includeTopicsCache.contains(topicName);
Expand Down
Loading

0 comments on commit 2975531

Please sign in to comment.