diff --git a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/dlq/LocalFileDeadLetterQueueHandler.java b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/dlq/LocalFileDeadLetterQueueHandler.java index 665de36..c21074c 100644 --- a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/dlq/LocalFileDeadLetterQueueHandler.java +++ b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/dlq/LocalFileDeadLetterQueueHandler.java @@ -19,6 +19,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; +/** + * A {@link DeadLetterQueueHandler} that writes failed uploads to a local file with human-readable formatting. + */ public class LocalFileDeadLetterQueueHandler extends DeadLetterQueueHandler { private static final Logger LOG = LogManager.getLogger(LocalFileDeadLetterQueueHandler.class); diff --git a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/dlq/S3LocalExecutableDeadLetterQueueHandler.java b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/dlq/S3LocalExecutableDeadLetterQueueHandler.java index 7f9a3da..4cbb2ff 100644 --- a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/dlq/S3LocalExecutableDeadLetterQueueHandler.java +++ b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/dlq/S3LocalExecutableDeadLetterQueueHandler.java @@ -19,6 +19,10 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; +/** + * A {@link DeadLetterQueueHandler} that writes failed uploads to a local executable file in a format + * that can be run as-is to retry the uploads. + */ public class S3LocalExecutableDeadLetterQueueHandler extends DeadLetterQueueHandler { private static final Logger LOG = LogManager.getLogger(S3LocalExecutableDeadLetterQueueHandler.class); diff --git a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestDirectoryTreeWatcher.java b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestDirectoryTreeWatcher.java index 620d1fa..bd38c91 100644 --- a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestDirectoryTreeWatcher.java +++ b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestDirectoryTreeWatcher.java @@ -184,6 +184,9 @@ void testExponentialBackoffRetries() throws InterruptedException { } } + /** + * Test that retry exhaustion calls the DLQ handler + */ @Test void testRetryExhaustion() throws Exception { // override s3AsyncClient to have a very short timeout diff --git a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestMultiThreadedS3FileUploader.java b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestMultiThreadedS3FileUploader.java index 3384965..114842f 100644 --- a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestMultiThreadedS3FileUploader.java +++ b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestMultiThreadedS3FileUploader.java @@ -148,6 +148,9 @@ public void onCompletion(DirectoryTreeWatcher.UploadTask uploadTask, long totalT }); } + /** + * Ensure that an upload that times out returns the correct error code and exception + */ @Test void testTimeoutUpload() throws IOException { // override s3AsyncClient to have a very short timeout diff --git a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/dlq/TestS3LocalExecutableDeadLetterQueueHandler.java b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/dlq/TestS3LocalExecutableDeadLetterQueueHandler.java index 1f14d1f..06e278c 100644 --- a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/dlq/TestS3LocalExecutableDeadLetterQueueHandler.java +++ b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/dlq/TestS3LocalExecutableDeadLetterQueueHandler.java @@ -25,6 +25,9 @@ public class TestS3LocalExecutableDeadLetterQueueHandler extends TestBase { + /** + * Test the handler by having it send many tasks concurrently to a local executable file. + */ @Test void testConcurrentSend() throws IOException, ExecutionException, InterruptedException { String filePath = "/tmp/s3-local-executable-dlq-handler-test.sh";