From eb85843479d8e581aac46977f9c75601918fdca6 Mon Sep 17 00:00:00 2001 From: R Vadai Date: Wed, 29 Mar 2017 19:01:38 +0100 Subject: [PATCH 01/11] Java integration tests --- .gitignore | 3 + docker-compose.yml | 55 +++ example-connect-s3-sink.properties | 7 - example-connect-worker.properties | 37 -- pom.xml | 21 +- run-integration-tests.sh | 43 +++ .../kafka_connect_s3/ConnectorIT.java | 199 ++++++++++ src/test/resources/test-connector.json | 13 + system_test/.gitignore | 2 - system_test/README.md | 68 ---- system_test/run.py | 344 ------------------ system_test/system-test-s3-sink.properties | 9 - system_test/system-test-worker.properties | 37 -- 13 files changed, 333 insertions(+), 505 deletions(-) create mode 100644 docker-compose.yml delete mode 100644 example-connect-s3-sink.properties delete mode 100644 example-connect-worker.properties create mode 100755 run-integration-tests.sh create mode 100644 src/test/java/com/deviantart/kafka_connect_s3/ConnectorIT.java create mode 100644 src/test/resources/test-connector.json delete mode 100644 system_test/.gitignore delete mode 100644 system_test/README.md delete mode 100644 system_test/run.py delete mode 100644 system_test/system-test-s3-sink.properties delete mode 100644 system_test/system-test-worker.properties diff --git a/.gitignore b/.gitignore index 7c55bd9..bb2f764 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,9 @@ local-* # Created by https://www.gitignore.io/api/maven +.idea +*.iml + ### Maven ### target/ pom.xml.tag diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..43e8fee --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,55 @@ +--- +version: '2' +services: + zookeeper: + image: confluentinc/cp-zookeeper + hostname: zookeeper + ports: + - '2181:2181' + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + broker: + image: confluentinc/cp-kafka + hostname: broker + depends_on: + - zookeeper + ports: + - '9092:9092' + environment: + - KAFKA_BROKER_ID=1 + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 + - KAFKA_ADVERTISED_LISTENERS + + fakes3: + image: lphoward/fake-s3 + hostname: fakes3 + ports: + - '4569:4569' + + connect: + image: confluentinc/cp-kafka-connect + hostname: connect + depends_on: + - zookeeper + - broker + ports: + - "8083:8083" + environment: + CONNECT_BOOTSTRAP_SERVERS: 'broker:9092' + CONNECT_REST_ADVERTISED_HOST_NAME: connect + CONNECT_REST_PORT: 8083 + CONNECT_GROUP_ID: compose-connect-group + CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs + CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets + CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status + CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter + CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter + CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter + CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181' + CONNECT_CONSUMER_MAX_POLL_INTERVAL_MS: 1000 + volumes: + - './target:/etc/kafka-connect/jars' diff --git a/example-connect-s3-sink.properties b/example-connect-s3-sink.properties deleted file mode 100644 index c816616..0000000 --- a/example-connect-s3-sink.properties +++ /dev/null @@ -1,7 +0,0 @@ -name=s3-sink -connector.class=com.deviantart.kafka_connect_s3.S3SinkConnector -tasks.max=1 -topics=test -s3.bucket=connect-test -s3.prefix=connect-test -local.buffer.dir=/tmp/kafka-connect-s3.buffer \ No newline at end of file diff --git a/example-connect-worker.properties b/example-connect-worker.properties deleted file mode 100644 index 0e6f49e..0000000 --- a/example-connect-worker.properties +++ /dev/null @@ -1,37 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# These are defaults. This file just demonstrates how to override some settings. -bootstrap.servers=localhost:9092 - -# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will -# need to configure these based on the format they want their data in when loaded from or stored into Kafka -key.converter=org.apache.kafka.connect.storage.StringConverter -value.converter=org.apache.kafka.connect.storage.StringConverter - -# The internal converter used for offsets and config data is configurable and must be specified, but most users will -# always want to use the built-in default. Offset and config data is never visible outside of Copcyat in this format. -internal.key.converter=org.apache.kafka.connect.json.JsonConverter -internal.value.converter=org.apache.kafka.connect.json.JsonConverter -internal.key.converter.schemas.enable=true -internal.value.converter.schemas.enable=true - -offset.storage.file.filename=/tmp/connect.offsets - -# Flush much faster than normal, which is useful for testing/debugging -offset.flush.interval.ms=5000 - -# Reload metadata faster too so consumer picks up new topics -consumer.metadata.max.age.ms=10000 \ No newline at end of file diff --git a/pom.xml b/pom.xml index 915acb4..4a97f85 100644 --- a/pom.xml +++ b/pom.xml @@ -45,6 +45,19 @@ + + org.apache.maven.plugins + maven-failsafe-plugin + 2.19.1 + + + + integration-test + verify + + + + @@ -73,7 +86,7 @@ junit junit - 3.8.1 + 4.11 test @@ -82,5 +95,11 @@ 1.9.5 test + + com.amazonaws + aws-java-sdk + 1.11.109 + test + diff --git a/run-integration-tests.sh b/run-integration-tests.sh new file mode 100755 index 0000000..29770bb --- /dev/null +++ b/run-integration-tests.sh @@ -0,0 +1,43 @@ +#!/usr/bin/env bash + +set -e + +trap "docker-compose down" EXIT + +if uname | grep -q Linux; then + export DOCKER_BIND_IP=$(ip route | awk '/default/ { print $3 }') +elif uname | grep -q Darwin; then + export DOCKER_BIND_IP=$(ifconfig en0 | grep inet | grep -v inet6 | awk '{print $2}') +else + echo "Unsupported operating system." + exit 1 +fi + +echo -e "\nDocker bind IP address: $DOCKER_BIND_IP\n" + +export KAFKA_HOST=$DOCKER_BIND_IP +export KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://${KAFKA_HOST}:9092" +export KAFKA_BROKERS="${KAFKA_HOST}:9092" + +# Building connector JAR and starting test cluster +mvn clean package +docker-compose up -d +sleep 10 + +# Creating topic for test messages +docker-compose exec broker \ +kafka-topics --zookeeper zookeeper:2181 \ +--topic test-topic \ +--create \ +--partitions 3 \ +--replication-factor 1 + +sleep 5 + +# Submitting JSON configuration +curl -H "Content-Type: application/json" \ +--data "@src/test/resources/test-connector.json" \ +http://localhost:8083/connectors + +# Running integration tests +mvn verify diff --git a/src/test/java/com/deviantart/kafka_connect_s3/ConnectorIT.java b/src/test/java/com/deviantart/kafka_connect_s3/ConnectorIT.java new file mode 100644 index 0000000..4c6155a --- /dev/null +++ b/src/test/java/com/deviantart/kafka_connect_s3/ConnectorIT.java @@ -0,0 +1,199 @@ +package com.deviantart.kafka_connect_s3; + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.S3ClientOptions; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.util.IOUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.joda.time.Instant; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.zip.GZIPInputStream; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +public class ConnectorIT { + + private static final String TEST_TOPIC_NAME = "test-topic"; + private static final String BUCKET_NAME = "fakes3"; + private static final String TODAY_FORMATTED = + new SimpleDateFormat("YYYY-MM-dd").format(Instant.now().toDate()); + private static final String BUCKET_PREFIX = "connect-system-test/"; + private static final String FILE_PREFIX = "systest/"; + private static final String FILE_PREFIX_WITH_DATE = FILE_PREFIX + TODAY_FORMATTED + "/"; + private static final Charset UTF8 = Charset.forName("UTF-8"); + + private static KafkaProducer producer; + private static List> messages; + private static List expectedMessagesInS3PerPartition = Arrays.asList("", "", ""); + private static AmazonS3Client s3Client; + + @BeforeClass + public static void oneTimeSetUp() { + + String kafkaBrokers = System.getenv("KAFKA_BROKERS"); + String fakeS3Endpoint = "http://localhost:4569"; + + Properties producerProperties = new Properties(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProperties.put("producer.type", "async"); + producerProperties.put(ProducerConfig.ACKS_CONFIG, "1"); + producerProperties.put(ProducerConfig.LINGER_MS_CONFIG, "5"); + + producer = new KafkaProducer<>(producerProperties); + + messages = new ArrayList<>(100); + + for (int i = 200; i < 300; i++) { + int partition = i % 3; + String message = "{\"foo\": \"bar\", \"counter\":" + i + "}"; + + String existingMessagesInS3PerPartition = expectedMessagesInS3PerPartition.get(partition); + existingMessagesInS3PerPartition += message + "\n"; + expectedMessagesInS3PerPartition.set(partition, existingMessagesInS3PerPartition); + + messages.add( + new ProducerRecord<>(TEST_TOPIC_NAME, partition, i, message) + ); + } + + BasicAWSCredentials credentials = new BasicAWSCredentials("foo", "bar"); + s3Client = new AmazonS3Client(credentials); + s3Client.setEndpoint(fakeS3Endpoint); + s3Client.setS3ClientOptions(new S3ClientOptions().withPathStyleAccess(true)); + } + + @Test + public void connectorShouldSaveFileInS3() throws InterruptedException, ExecutionException, IOException { + + Iterator> messagesIter = messages.iterator(); + while (messagesIter.hasNext()) { + producer.send(messagesIter.next()).get(); + } + + Thread.sleep(60_000L); + + /* + * Asserting messages saved from partition 0 + */ + + assertS3FileContents( + BUCKET_PREFIX + FILE_PREFIX + "last_chunk_index.test-topic-00000.txt", + FILE_PREFIX_WITH_DATE + "test-topic-00000-000000000000.index.json", + false, + UTF8 + ); + + assertS3FileContents( + BUCKET_PREFIX + FILE_PREFIX_WITH_DATE + "test-topic-00000-000000000000.index.json", + "{\"chunks\":[{\"byte_length_uncompressed\":990,\"num_records\":33,\"byte_length\":137,\"byte_offset\":0,\"first_record_offset\":0}]}", + false, + UTF8 + ); + + assertS3FileContents( + BUCKET_PREFIX + FILE_PREFIX_WITH_DATE + "test-topic-00000-000000000000.gz", + expectedMessagesInS3PerPartition.get(0), + true, + UTF8 + ); + + /* + * Asserting messages saved from partition 1 + */ + + assertS3FileContents( + BUCKET_PREFIX + FILE_PREFIX + "last_chunk_index.test-topic-00001.txt", + FILE_PREFIX_WITH_DATE + "test-topic-00001-000000000000.index.json", + false, + UTF8 + ); + + assertS3FileContents( + BUCKET_PREFIX + FILE_PREFIX_WITH_DATE + "test-topic-00001-000000000000.index.json", + "{\"chunks\":[{\"byte_length_uncompressed\":990,\"num_records\":33,\"byte_length\":137,\"byte_offset\":0,\"first_record_offset\":0}]}", + false, + UTF8 + ); + + assertS3FileContents( + BUCKET_PREFIX + FILE_PREFIX_WITH_DATE + "test-topic-00001-000000000000.gz", + expectedMessagesInS3PerPartition.get(1), + true, + UTF8 + ); + + /* + * Asserting messages saved from partition 2 + */ + + assertS3FileContents( + BUCKET_PREFIX + FILE_PREFIX + "last_chunk_index.test-topic-00002.txt", + FILE_PREFIX_WITH_DATE + "test-topic-00002-000000000000.index.json", + false, + UTF8 + ); + + assertS3FileContents( + BUCKET_PREFIX + FILE_PREFIX_WITH_DATE + "test-topic-00002-000000000000.index.json", + "{\"chunks\":[{\"byte_length_uncompressed\":1020,\"num_records\":34,\"byte_length\":139,\"byte_offset\":0,\"first_record_offset\":0}]}", + false, + UTF8 + ); + + assertS3FileContents( + BUCKET_PREFIX + FILE_PREFIX_WITH_DATE + "test-topic-00002-000000000000.gz", + expectedMessagesInS3PerPartition.get(2), + true, + UTF8 + ); + } + + private void assertS3FileContents(String key, String content, boolean gzipped, Charset encoding) throws IOException { + S3Object s3Object = s3Client.getObject(new GetObjectRequest(BUCKET_NAME, key)); + InputStream objectInputStream = s3Object.getObjectContent(); + String objectContent = null; + + if (gzipped) { + byte[] res = decompressGzipContent(objectInputStream); + objectContent = new String(res); + } else { + objectContent = IOUtils.toString(objectInputStream); + } + + s3Object.close(); + + assertThat(objectContent, is(content)); + } + + private byte[] decompressGzipContent(InputStream is){ + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try{ + IOUtils.copy(new GZIPInputStream(is), out); + } catch(IOException e){ + throw new RuntimeException(e); + } + return out.toByteArray(); + } +} diff --git a/src/test/resources/test-connector.json b/src/test/resources/test-connector.json new file mode 100644 index 0000000..19c4982 --- /dev/null +++ b/src/test/resources/test-connector.json @@ -0,0 +1,13 @@ +{ + "name": "s3-sink-connector", + "config": { + "connector.class": "com.deviantart.kafka_connect_s3.S3SinkConnector", + "tasks.max": "1", + "topics": "test-topic", + "s3.bucket": "connect-system-test", + "s3.prefix": "systest", + "s3.endpoint": "http://fakes3:4569", + "s3.path_style": "true", + "local.buffer.dir": "/tmp" + } +} diff --git a/system_test/.gitignore b/system_test/.gitignore deleted file mode 100644 index ded1d5b..0000000 --- a/system_test/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -standalone-kafka -data diff --git a/system_test/README.md b/system_test/README.md deleted file mode 100644 index b4e4442..0000000 --- a/system_test/README.md +++ /dev/null @@ -1,68 +0,0 @@ -# System Test - -## Installing Dependencies - -These steps should be required just once. - - - Install [Docker](https://docker.io) - - Install [standalone kafka docker tool](https://github.com/DeviantArt/standalone-kafka), _in the `system_test` directory_. It's already in `.gitignore`. If you already have it elsewhere or wish to share the installation, create a symlink so that we can find the helper scripts in `./system-test/standalone-kafka/kafka/bin/` relative to this repo's root. - - Note that we rely on `auto.create.topics.enable = true` in the kafka broker config -```sh -$ git clone https://github.com/DeviantArt/standalone-kafka.git -$ cd standalone-kafka -$ docker build -t deviantart/standalone-kafka . -``` - - Install [FakeS3](https://github.com/jubos/fake-s3) (assumes you have ruby/gem installed) -```sh -$ [sudo] gem install fakes3 -``` - - Install [kafka-python](https://github.com/dpkp/kafka-python) -```sh -$ [sudo] pip install kafka-python -``` - - Install [boto](https://github.com/boto/boto) -```sh -$ [sudo] pip install boto -``` - -## Setup Before Test Session - -Since setup is somewhat expensive and complicated to automate in a highly portable fashion, we -make setup a somewhat manual step to perform before running tests. - -### Start standalone-kafka Docker image - -Kafka must be accessible on `localhost:9092`. - -Run it with: - -```sh -$ docker run -d -p 2181:2181 -p 9092:9092 --name kafka deviantart/standalone-kafka -``` - -The `name` param means you can use `$ docker kill kafka` when you're done. - -### Map ports (old versions of Docker) - -On newer versions of Docker (where Docker daemon is running directly on host kernel) you should be done. - -Older versions of Docker, where docker is running inside of a virtual machine, you now need to forward localhost ports to the virtual machine. - -Assuming you're using [docker-machine](https://docs.docker.com/machine/overview/) to manage the virtual machine, you can map the ports with: - -```sh -$ docker-machine ssh default -f -N -L 9092:localhost:9092 -L 2181:localhost:2181 -``` - -**Note:** you need a recent version of docker-machine for this to work. Known to work on 0.5.6. - -The same should work with regular ssh for a non-docker machine VM. - - -## Running Tests - -From the repo root dir, run: - -```sh -$ python system_test/run.py -``` diff --git a/system_test/run.py b/system_test/run.py deleted file mode 100644 index f6d85ef..0000000 --- a/system_test/run.py +++ /dev/null @@ -1,344 +0,0 @@ -import unittest -import subprocess -import os -import shutil -import time -from datetime import date -import sys -import socket -from kafka import KafkaProducer -import json -from boto.s3.connection import S3Connection, OrdinaryCallingFormat -import zlib -import xml.etree.ElementTree as ET - -this_dir = os.path.dirname(__file__) - -fakes3_data_path = os.path.join(this_dir, 'data') -fixture_path = os.path.join(this_dir, 'fixture') -# as configured in system-test-s3-sink.properties. might not be super portable -# but simplest option for now without needing to dynamically write that config -# for each run... -connect_data_path = '/tmp/connect-system-test' - -g_fakes3_proc = None -g_s3connect_proc = None - -def modulo_partitioner(key, all_partitions, available_partitions): - if key is None: - key = 0 - idx = int(key) % len(all_partitions) - return all_partitions[idx] - -g_producer = KafkaProducer(bootstrap_servers="localhost:9092", - partitioner=modulo_partitioner, - metadata_max_age_ms=1000); - -g_s3_conn = S3Connection('foo', 'bar', is_secure=False, port=9090, host='localhost', - calling_format=OrdinaryCallingFormat()) - - -# requires proc to be Popened with stdout=subprocess.PIPE,stderr=subprocess.STDOUT -def dumpServerStdIO(proc, msg, until=None, until_fail=None, timeout=None, trim_indented=False, post_fail_lines=20): - sys.stdout.write(msg + os.linesep) - - if not proc: - return (False, None) - - start = time.time() - # After we see fail, add another few lines to see the full error - post_fail_lines_remaining = post_fail_lines - fail_line = None - while True: - try: - line = proc.stdout.readline() - if not line: - break - - if fail_line is not None: - if post_fail_lines_remaining <= 0: - return (False, fail_line) - else: - sys.stderr.write(" STDIO> " + line) - post_fail_lines_remaining -= 1 - continue - - if not trim_indented or not line.startswith((' ', '\t')): - sys.stderr.write(" STDIO> " + line) - - if until_fail and line.find(until_fail) >= 0: - fail_line = line - continue - if until and line.find(until) >= 0: - return (True, line) - - if timeout is not None and (time.time() - start) > timeout: - return (False, "Timedout after {} second".format(time.time() - start)) - - except (KeyboardInterrupt, SystemExit): - tearDownModule() - sys.exit(1) - - return (True, None) - -def setUpModule(): - global g_fakes3_proc, g_s3_conn - - # Clean up data from previous runs - if os.path.isdir(fakes3_data_path): - shutil.rmtree(fakes3_data_path) - - if os.path.isdir('/tmp/connect-system-test'): - shutil.rmtree('/tmp/connect-system-test') - - # Recreate the dirs! - os.mkdir(fakes3_data_path) - os.mkdir(connect_data_path) - - # Clear our topic from Kafka - try: - subprocess.check_output([os.path.join(this_dir, 'standalone-kafka/kafka/bin/kafka-topics.sh'), - '--zookeeper', 'localhost:2181', '--delete', '--topic', 'system-test']); - except subprocess.CalledProcessError as e: - # if the complaint is that the topic doesn't exist then ignore it, otherwise fail loudly - if e.output.find("Topic system-test does not exist on ZK path") < 0: - raise e - - # Recreate fresh - output = subprocess.check_output([os.path.join(this_dir, 'standalone-kafka/kafka/bin/kafka-topics.sh'), - '--zookeeper', 'localhost:2181', '--create', '--topic', 'system-test', - '--partitions', '1', '--replication-factor', '1']); - if output != "Created topic \"system-test\".\n": - raise RuntimeError("Failed to create test topic:\n{}".format(output)) - - # Run fakeS3 - print "Starting FakeS3..." - g_fakes3_proc = subprocess.Popen(['fakes3', '-p', '9090', '-r', fakes3_data_path], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT); - - try: - print "While we wait, let's do very basic check that kafka is up" - sock = socket.create_connection(('localhost', 9092), 1) - # Connected without throwing timeout exception so just close again - sock.close(); - print "Great, Kafka seems to be there." - - dumpServerStdIO(g_fakes3_proc, "Just waiting for FakeS3 to be ready...", "WEBrick::HTTPServer#start"); - - # ensure bucket is created - g_s3_conn.create_bucket('connect-system-test') - - print "SETUP DONE" - - except: - tearDownModule() - raise - -def tearDownModule(): - global g_fakes3_proc - if g_s3connect_proc is not None: - print "Terminating Kafka Connect" - g_s3connect_proc.kill() - g_s3connect_proc.wait() - if g_fakes3_proc is not None: - print "Terminating FakeS3" - g_fakes3_proc.kill() - g_fakes3_proc.wait() - - print "TEARDOWN DONE" - -def runS3ConnectStandalone(): - global g_s3connect_proc - # quick hack to get version from pom. - tree = ET.parse(os.path.join(this_dir, '..', 'pom.xml')) - root = tree.getroot() - version = root.find('{http://maven.apache.org/POM/4.0.0}version').text - env = { - 'CLASSPATH': os.path.join(this_dir, '../target/kafka-connect-s3-{}.jar'.format(version)) - } - cmd = [os.path.join(this_dir, 'standalone-kafka/kafka/bin/connect-standalone.sh'), - os.path.join(this_dir, 'system-test-worker.properties'), - os.path.join(this_dir, 'system-test-s3-sink.properties')] - - g_s3connect_proc = subprocess.Popen(cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - env=env) - - dumpServerStdIO(g_s3connect_proc, - "Wait for S3 connect initialisation...", - "finished initialization and start", - trim_indented=True) - - return g_s3connect_proc - -class TestConnectS3(unittest.TestCase): - ''' - These tests are highly non-deterministic, but they pass almost always on my local setup. - - Controlling things like time, exactly when connect flushes etc are not really possible. - - Making validation here so smart that it can correctly identify any valid output of the system - without false positives is a strictly harder programming problem than the system under test. - - So this serves as a manually-run set of smoke tests that sanity check the integration logic of - the implementation, and automate a many-step ad-hoc testing environment. - ''' - def test_basic_consuming(self): - global g_producer - topic = "system-test" - - s3connect = runS3ConnectStandalone() - - # messages produced asynchronously - synchronous producing makes it likely - # they will be split into different flushes in connect - expected_data = '' - for i in range(0, 100): - record = b'{{"foo": "bar", "counter":{}}}'.format(i) - g_producer.send(topic, record) - expected_data += record + '\n' - - ok, line = dumpServerStdIO(s3connect, "Wait for connect to process and commit", - until="Successfully uploaded chunk for system-test-0", - until_fail="ERROR", - timeout=5, trim_indented=True) - - self.assertTrue(ok, msg="Didn't get success message but did get: {}".format(line)) - - today = date.today() - - pfx = 'systest/{}/'.format(today.isoformat()) - - # Fetch the files written and assert they are as expected - self.assert_s3_file_contents('systest/last_chunk_index.system-test-00000.txt', - pfx+'system-test-00000-000000000000.index.json') - - self.assert_s3_file_contents(pfx+'system-test-00000-000000000000.index.json', - '{"chunks":[{"byte_length_uncompressed":2890,"num_records":100,"byte_length":275,"byte_offset":0,"first_record_offset":0}]}') - - - self.assert_s3_file_contents(pfx+'system-test-00000-000000000000.gz', expected_data, gzipped=True) - - # Now stop the connect process and restart it and ensure it correctly resumes from where we left - print "Restarting Kafka Connect" - s3connect.kill() - s3connect.wait() - - # produce 100 more entries - expected_data = '' - for i in range(100, 200): - record = b'{{"foo": "bar", "counter":{}}}'.format(i) - g_producer.send(topic, record) - expected_data += record + '\n' - - # restart connect - s3connect = runS3ConnectStandalone() - - ok, line = dumpServerStdIO(s3connect, "Wait for connect to process and commit", - until="Successfully uploaded chunk for system-test-0", - until_fail="ERROR", - timeout=5, trim_indented=True) - - self.assertTrue(ok, msg="Didn't get success message but did get: {}".format(line)) - - today = date.today() - - pfx = 'systest/{}/'.format(today.isoformat()) - - # Fetch the files written and assert they are as expected - self.assert_s3_file_contents('systest/last_chunk_index.system-test-00000.txt', - pfx+'system-test-00000-000000000100.index.json') - - self.assert_s3_file_contents(pfx+'system-test-00000-000000000100.index.json', - '{"chunks":[{"byte_length_uncompressed":3000,"num_records":100,"byte_length":272,"byte_offset":0,"first_record_offset":100}]}') - - - self.assert_s3_file_contents(pfx+'system-test-00000-000000000100.gz', expected_data, gzipped=True) - - # now we test reconfiguring the topic to have more partitions... - print "Reconfiguring topic..." - output = subprocess.check_output([os.path.join(this_dir, 'standalone-kafka/kafka/bin/kafka-topics.sh'), - '--zookeeper', 'localhost:2181', '--alter', '--topic', 'system-test', - '--partitions', '3']); - - if not output.endswith("Adding partitions succeeded!\n"): - raise RuntimeError("Failed to reconfigure test topic:\n{}".format(output)) - - # wait for out producer to catch up with the reconfiguration otherwise we'll keep producing only - # to the single partition - while len(g_producer.partitions_for('system-test')) < 3: - print "Waiting for new partitions to show up in producer" - time.sleep(0.5) - - # produce some more, this time with keys so we know where they will end up - expected_partitions = ['','',''] - for i in range(200, 300): - record = b'{{"foo": "bar", "counter":{}}}'.format(i) - g_producer.send(topic, key=bytes(i), value=record) - expected_partitions[i % 3] += record + '\n' - - # wait for all three partitions to commit (not we don't match partition number as) - # we can't assume what order they will appear in. - ok, line = dumpServerStdIO(s3connect, "Wait for connect to process and commit 1/3", - until="Successfully uploaded chunk for system-test-", - until_fail="ERROR", - timeout=5, trim_indented=True) - self.assertTrue(ok, msg="Didn't get success message but did get: {}".format(line)) - - ok, line = dumpServerStdIO(s3connect, "Wait for connect to process and commit 2/3", - until="Successfully uploaded chunk for system-test-", - until_fail="ERROR", - timeout=5, trim_indented=True) - self.assertTrue(ok, msg="Didn't get success message but did get: {}".format(line)) - - ok, line = dumpServerStdIO(s3connect, "Wait for connect to process and commit 3/3", - until="Successfully uploaded chunk for system-test-", - until_fail="ERROR", - timeout=5, trim_indented=True) - self.assertTrue(ok, msg="Didn't get success message but did get: {}".format(line)) - - # partition 0 - self.assert_s3_file_contents('systest/last_chunk_index.system-test-00000.txt', - pfx+'system-test-00000-000000000200.index.json') - - self.assert_s3_file_contents(pfx+'system-test-00000-000000000200.index.json', - '{"chunks":[{"byte_length_uncompressed":990,"num_records":33,"byte_length":137,"byte_offset":0,"first_record_offset":200}]}') - - - self.assert_s3_file_contents(pfx+'system-test-00000-000000000200.gz', expected_partitions[0], gzipped=True) - - # partition 1 (new partition will start from offset 0) - self.assert_s3_file_contents('systest/last_chunk_index.system-test-00001.txt', - pfx+'system-test-00001-000000000000.index.json') - - self.assert_s3_file_contents(pfx+'system-test-00001-000000000000.index.json', - '{"chunks":[{"byte_length_uncompressed":990,"num_records":33,"byte_length":137,"byte_offset":0,"first_record_offset":0}]}') - - - self.assert_s3_file_contents(pfx+'system-test-00001-000000000000.gz', expected_partitions[1], gzipped=True) - - # partition 2 (new partition will start from offset 0) - self.assert_s3_file_contents('systest/last_chunk_index.system-test-00002.txt', - pfx+'system-test-00002-000000000000.index.json') - - self.assert_s3_file_contents(pfx+'system-test-00002-000000000000.index.json', - '{"chunks":[{"byte_length_uncompressed":1020,"num_records":34,"byte_length":139,"byte_offset":0,"first_record_offset":0}]}') - - - self.assert_s3_file_contents(pfx+'system-test-00002-000000000000.gz', expected_partitions[2], gzipped=True) - - def assert_s3_file_contents(self, key, content, gzipped=False, encoding="utf-8"): - global g_s3_conn - bucket = g_s3_conn.get_bucket('connect-system-test') - file = bucket.get_key(key) - actual_contents = file.get_contents_as_string() - if gzipped: - # Hacks, http://stackoverflow.com/questions/2695152/in-python-how-do-i-decode-gzip-encoding - actual_contents = zlib.decompress(actual_contents, 16+zlib.MAX_WBITS) - - self.assertEqual(content, actual_contents.decode(encoding)) - - -if __name__ == '__main__': - unittest.main() diff --git a/system_test/system-test-s3-sink.properties b/system_test/system-test-s3-sink.properties deleted file mode 100644 index 956659e..0000000 --- a/system_test/system-test-s3-sink.properties +++ /dev/null @@ -1,9 +0,0 @@ -name=s3-sink -connector.class=com.deviantart.kafka_connect_s3.S3SinkConnector -tasks.max=3 -topics=system-test -s3.bucket=connect-system-test -s3.prefix=systest -s3.endpoint=http://localhost:9090 -s3.path_style=true -local.buffer.dir=/tmp/connect-system-test \ No newline at end of file diff --git a/system_test/system-test-worker.properties b/system_test/system-test-worker.properties deleted file mode 100644 index 705ab1d..0000000 --- a/system_test/system-test-worker.properties +++ /dev/null @@ -1,37 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# These are defaults. This file just demonstrates how to override some settings. -bootstrap.servers=localhost:9092 - -# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will -# need to configure these based on the format they want their data in when loaded from or stored into Kafka -key.converter=org.apache.kafka.connect.storage.StringConverter -value.converter=org.apache.kafka.connect.storage.StringConverter - -# The internal converter used for offsets and config data is configurable and must be specified, but most users will -# always want to use the built-in default. Offset and config data is never visible outside of Copcyat in this format. -internal.key.converter=org.apache.kafka.connect.json.JsonConverter -internal.value.converter=org.apache.kafka.connect.json.JsonConverter -internal.key.converter.schemas.enable=true -internal.value.converter.schemas.enable=true - -offset.storage.file.filename=/tmp/connect.offsets - -# Flush much faster than normal, which is useful for testing/debugging -offset.flush.interval.ms=1000 - -# Reload metadata faster too so consumer picks up new topics -consumer.metadata.max.age.ms=1000 From 7a04023b63ab61434ad3f39d8e29028bb9d7cd05 Mon Sep 17 00:00:00 2001 From: R Vadai Date: Wed, 29 Mar 2017 19:05:33 +0100 Subject: [PATCH 02/11] Refactored block gzip file writer --- .../kafka_connect_s3/BlockFileWriter.java | 183 ++++++++++++++ .../kafka_connect_s3/BlockGZIPFileWriter.java | 233 ++---------------- .../deviantart/kafka_connect_s3/Chunk.java | 9 + .../CountingOutputStream.java | 33 +++ 4 files changed, 247 insertions(+), 211 deletions(-) create mode 100644 src/main/java/com/deviantart/kafka_connect_s3/BlockFileWriter.java create mode 100644 src/main/java/com/deviantart/kafka_connect_s3/Chunk.java create mode 100644 src/main/java/com/deviantart/kafka_connect_s3/CountingOutputStream.java diff --git a/src/main/java/com/deviantart/kafka_connect_s3/BlockFileWriter.java b/src/main/java/com/deviantart/kafka_connect_s3/BlockFileWriter.java new file mode 100644 index 0000000..cacf92e --- /dev/null +++ b/src/main/java/com/deviantart/kafka_connect_s3/BlockFileWriter.java @@ -0,0 +1,183 @@ +package com.deviantart.kafka_connect_s3; + +import java.io.BufferedWriter; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FileWriter; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.UnsupportedEncodingException; + +import java.util.ArrayList; +import java.util.zip.GZIPOutputStream; + +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; + +abstract class BlockFileWriter { + private String filenameBase; + private String path; + protected BufferedWriter writer; + protected CountingOutputStream fileStream; + + protected ArrayList chunks; + + // Default each chunk is 64MB of uncompressed data + private long chunkThreshold; + + // Offset to the first record. + // Set to non-zero if this file is part of a larger stream and you want + // record offsets in the index to reflect the global offset rather than local + private long firstRecordOffset; + + BlockFileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold) throws IOException { + this.filenameBase = filenameBase; + this.path = path; + this.firstRecordOffset = firstRecordOffset; + this.chunkThreshold = chunkThreshold; + + chunks = new ArrayList(); + + // Initialize first chunk + Chunk ch = new Chunk(); + ch.firstOffset = firstRecordOffset; + chunks.add(ch); + + // Explicitly truncate the file. On linux and OS X this appears to happen + // anyway when opening with FileOutputStream but that behavior is not actually documented + // or specified anywhere so let's be rigorous about it. + FileOutputStream fos = new FileOutputStream(new File(getDataFilePath())); + fos.getChannel().truncate(0); + + // Open file for writing and setup + this.fileStream = new CountingOutputStream(fos); + initChunkWriter(); + } + + abstract protected void initChunkWriter() throws IOException; + abstract protected void finishChunk() throws IOException; + + protected Chunk currentChunk() { + return chunks.get(chunks.size() - 1); + } + + public long getFirstRecordOffset() { + return firstRecordOffset; + } + + public String getDataFileName() { + return String.format("%s-%012d.gz", filenameBase, firstRecordOffset); + } + + public String getIndexFileName() { + return String.format("%s-%012d.index.json", filenameBase, firstRecordOffset); + } + + public String getDataFilePath() { + return String.format("%s/%s", path, this.getDataFileName()); + } + + public String getIndexFilePath() { + return String.format("%s/%s", path, this.getIndexFileName()); + } + + /** + * Writes string to file, assuming this is a single record + * + * If there is no newline at then end we will add one + */ + public void write(String record) throws IOException { + Chunk ch = currentChunk(); + + boolean hasNewLine = record.endsWith("\n"); + + int rawBytesToWrite = record.length(); + if (!hasNewLine) { + rawBytesToWrite += 1; + } + + if ((ch.rawBytes + rawBytesToWrite) > chunkThreshold) { + finishChunk(); + initChunkWriter(); + + Chunk newCh = new Chunk(); + newCh.firstOffset = ch.firstOffset + ch.numRecords; + newCh.byteOffset = ch.byteOffset + ch.compressedByteLength; + chunks.add(newCh); + ch = newCh; + } + + writer.append(record); + if (!hasNewLine) { + writer.newLine(); + } + ch.rawBytes += rawBytesToWrite; + ch.numRecords++; + } + + public void delete() throws IOException { + deleteIfExists(getDataFilePath()); + deleteIfExists(getIndexFilePath()); + } + + private void deleteIfExists(String path) throws IOException { + File f = new File(path); + if (f.exists() && !f.isDirectory()) { + f.delete(); + } + } + + public void close() throws IOException { + // Flush last chunk, updating index + finishChunk(); + // Now close the writer (and the whole stream stack) + writer.close(); + writeIndex(); + } + + private void writeIndex() throws IOException { + JSONArray chunkArr = new JSONArray(); + + for (Chunk ch : chunks) { + JSONObject chunkObj = new JSONObject(); + chunkObj.put("first_record_offset", ch.firstOffset); + chunkObj.put("num_records", ch.numRecords); + chunkObj.put("byte_offset", ch.byteOffset); + chunkObj.put("byte_length", ch.compressedByteLength); + chunkObj.put("byte_length_uncompressed", ch.rawBytes); + chunkArr.add(chunkObj); + } + + JSONObject index = new JSONObject(); + index.put("chunks", chunkArr); + + try (FileWriter file = new FileWriter(getIndexFilePath())) { + file.write(index.toJSONString()); + file.close(); + } + } + + public int getTotalUncompressedSize() { + int totalBytes = 0; + for (Chunk ch : chunks) { + totalBytes += ch.rawBytes; + } + return totalBytes; + } + + public int getNumChunks() { + return chunks.size(); + } + + public int getNumRecords() { + int totalRecords = 0; + for (Chunk ch : chunks) { + totalRecords += ch.numRecords; + } + return totalRecords; + } +} diff --git a/src/main/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriter.java b/src/main/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriter.java index 1400307..8bd0c22 100644 --- a/src/main/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriter.java +++ b/src/main/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriter.java @@ -35,227 +35,38 @@ * Note that thanks to GZIP spec, the overall file is perfectly valid and will compress as if it was a single stream * with any regular GZIP decoding library or program. */ -public class BlockGZIPFileWriter { - private String filenameBase; - private String path; - private GZIPOutputStream gzipStream; - private BufferedWriter writer; - private CountingOutputStream fileStream; +public class BlockGZIPFileWriter extends BlockFileWriter { - private class Chunk { - public long rawBytes = 0; - public long byteOffset = 0; - public long compressedByteLength = 0; - public long firstOffset = 0; - public long numRecords = 0; - }; + private GZIPOutputStream gzipStream; - private class CountingOutputStream extends FilterOutputStream { - private long numBytes = 0; - - CountingOutputStream(OutputStream out) throws IOException { - super(out); - } - - @Override - public void write(int b) throws IOException { - out.write(b); - numBytes++; - } - @Override - public void write(byte[] b) throws IOException { - out.write(b); - numBytes += b.length; - } - @Override - public void write(byte[] b, int off, int len) throws IOException { - out.write(b, off, len); - numBytes += len; - } - - public long getNumBytesWritten() { - return numBytes; - } - }; - - private ArrayList chunks; - - // Default each chunk is 64MB of uncompressed data - private long chunkThreshold; - - // Offset to the first record. - // Set to non-zero if this file is part of a larger stream and you want - // record offsets in the index to reflect the global offset rather than local - private long firstRecordOffset; - - public BlockGZIPFileWriter(String filenameBase, String path) throws FileNotFoundException, IOException { - this(filenameBase, path, 0, 67108864); - } - - public BlockGZIPFileWriter(String filenameBase, String path, long firstRecordOffset) throws FileNotFoundException, IOException { - this(filenameBase, path, firstRecordOffset, 67108864); - } - - public BlockGZIPFileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold) - throws FileNotFoundException, IOException - { - this.filenameBase = filenameBase; - this.path = path; - this.firstRecordOffset = firstRecordOffset; - this.chunkThreshold = chunkThreshold; - - chunks = new ArrayList(); - - // Initialize first chunk - Chunk ch = new Chunk(); - ch.firstOffset = firstRecordOffset; - chunks.add(ch); - - // Explicitly truncate the file. On linux and OS X this appears to happen - // anyway when opening with FileOutputStream but that behavior is not actually documented - // or specified anywhere so let's be rigorous about it. - FileOutputStream fos = new FileOutputStream(new File(getDataFilePath())); - fos.getChannel().truncate(0); - - // Open file for writing and setup - this.fileStream = new CountingOutputStream(fos); - initChunkWriter(); - } - - private void initChunkWriter() throws IOException, UnsupportedEncodingException { - gzipStream = new GZIPOutputStream(fileStream); - writer = new BufferedWriter(new OutputStreamWriter(gzipStream, "UTF-8")); - } - - private Chunk currentChunk() { - return chunks.get(chunks.size() - 1); - } - - public long getFirstRecordOffset() { - return firstRecordOffset; - } - - public String getDataFileName() { - return String.format("%s-%012d.gz", filenameBase, firstRecordOffset); - } - - public String getIndexFileName() { - return String.format("%s-%012d.index.json", filenameBase, firstRecordOffset); - } - - public String getDataFilePath() { - return String.format("%s/%s", path, this.getDataFileName()); - } - - public String getIndexFilePath() { - return String.format("%s/%s", path, this.getIndexFileName()); - } - - /** - * Writes string to file, assuming this is a single record - * - * If there is no newline at then end we will add one - */ - public void write(String record) throws IOException { - Chunk ch = currentChunk(); - - boolean hasNewLine = record.endsWith("\n"); - - int rawBytesToWrite = record.length(); - if (!hasNewLine) { - rawBytesToWrite += 1; - } - - if ((ch.rawBytes + rawBytesToWrite) > chunkThreshold) { - finishChunk(); - initChunkWriter(); - - Chunk newCh = new Chunk(); - newCh.firstOffset = ch.firstOffset + ch.numRecords; - newCh.byteOffset = ch.byteOffset + ch.compressedByteLength; - chunks.add(newCh); - ch = newCh; + public BlockGZIPFileWriter(String filenameBase, String path) throws FileNotFoundException, IOException { + this(filenameBase, path, 0, 67108864); } - writer.append(record); - if (!hasNewLine) { - writer.newLine(); + public BlockGZIPFileWriter(String filenameBase, String path, long firstRecordOffset) throws FileNotFoundException, IOException { + this(filenameBase, path, firstRecordOffset, 67108864); } - ch.rawBytes += rawBytesToWrite; - ch.numRecords++; - } - - public void delete() throws IOException { - deleteIfExists(getDataFilePath()); - deleteIfExists(getIndexFilePath()); - } - private void deleteIfExists(String path) throws IOException { - File f = new File(path); - if (f.exists() && !f.isDirectory()) { - f.delete(); + public BlockGZIPFileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold) throws FileNotFoundException, IOException { + super(filenameBase, path, firstRecordOffset, chunkThreshold); } - } - private void finishChunk() throws IOException { - Chunk ch = currentChunk(); - - // Complete GZIP block without closing stream - writer.flush(); - gzipStream.finish(); - - // We can no find out how long this chunk was compressed - long bytesWritten = fileStream.getNumBytesWritten(); - ch.compressedByteLength = bytesWritten - ch.byteOffset; - } - - public void close() throws IOException { - // Flush last chunk, updating index - finishChunk(); - // Now close the writer (and the whole stream stack) - writer.close(); - writeIndex(); - } - - private void writeIndex() throws IOException { - JSONArray chunkArr = new JSONArray(); - - for (Chunk ch : chunks) { - JSONObject chunkObj = new JSONObject(); - chunkObj.put("first_record_offset", ch.firstOffset); - chunkObj.put("num_records", ch.numRecords); - chunkObj.put("byte_offset", ch.byteOffset); - chunkObj.put("byte_length", ch.compressedByteLength); - chunkObj.put("byte_length_uncompressed", ch.rawBytes); - chunkArr.add(chunkObj); - } - - JSONObject index = new JSONObject(); - index.put("chunks", chunkArr); - - try (FileWriter file = new FileWriter(getIndexFilePath())) { - file.write(index.toJSONString()); - file.close(); + @Override + protected void initChunkWriter() throws IOException, UnsupportedEncodingException { + gzipStream = new GZIPOutputStream(fileStream); + writer = new BufferedWriter(new OutputStreamWriter(gzipStream, "UTF-8")); } - } - public int getTotalUncompressedSize() { - int totalBytes = 0; - for (Chunk ch : chunks) { - totalBytes += ch.rawBytes; - } - return totalBytes; - } + @Override + protected void finishChunk() throws IOException { + Chunk ch = currentChunk(); - public int getNumChunks() { - return chunks.size(); - } + // Complete GZIP block without closing stream + writer.flush(); + gzipStream.finish(); - public int getNumRecords() { - int totalRecords = 0; - for (Chunk ch : chunks) { - totalRecords += ch.numRecords; + // We can no find out how long this chunk was compressed + long bytesWritten = fileStream.getNumBytesWritten(); + ch.compressedByteLength = bytesWritten - ch.byteOffset; } - return totalRecords; - } -} \ No newline at end of file +} diff --git a/src/main/java/com/deviantart/kafka_connect_s3/Chunk.java b/src/main/java/com/deviantart/kafka_connect_s3/Chunk.java new file mode 100644 index 0000000..90f029f --- /dev/null +++ b/src/main/java/com/deviantart/kafka_connect_s3/Chunk.java @@ -0,0 +1,9 @@ +package com.deviantart.kafka_connect_s3; + +class Chunk { + public long rawBytes = 0; + public long byteOffset = 0; + public long compressedByteLength = 0; + public long firstOffset = 0; + public long numRecords = 0; +}; diff --git a/src/main/java/com/deviantart/kafka_connect_s3/CountingOutputStream.java b/src/main/java/com/deviantart/kafka_connect_s3/CountingOutputStream.java new file mode 100644 index 0000000..be3be77 --- /dev/null +++ b/src/main/java/com/deviantart/kafka_connect_s3/CountingOutputStream.java @@ -0,0 +1,33 @@ +package com.deviantart.kafka_connect_s3; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +class CountingOutputStream extends FilterOutputStream { + private long numBytes = 0; + + CountingOutputStream(OutputStream out) throws IOException { + super(out); + } + + @Override + public void write(int b) throws IOException { + out.write(b); + numBytes++; + } + @Override + public void write(byte[] b) throws IOException { + out.write(b); + numBytes += b.length; + } + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + numBytes += len; + } + + public long getNumBytesWritten() { + return numBytes; + } +} From dcd16e55493bfbe27390eafeda3e07025b024dd2 Mon Sep 17 00:00:00 2001 From: R Vadai Date: Mon, 3 Apr 2017 11:37:05 +0100 Subject: [PATCH 03/11] Added bzip2 file handling --- pom.xml | 5 + .../BlockBZIP2FileWriter.java | 69 ++++ .../kafka_connect_s3/BlockFileWriter.java | 7 +- .../kafka_connect_s3/BlockGZIPFileWriter.java | 7 +- .../BlockBZIP2FileWriterTest.java | 235 ++++++++++++ .../BlockFileWriterTestCommon.java | 21 ++ .../BlockGZIPFileWriterTest.java | 351 +++++++++--------- 7 files changed, 509 insertions(+), 186 deletions(-) create mode 100644 src/main/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriter.java create mode 100644 src/test/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriterTest.java create mode 100644 src/test/java/com/deviantart/kafka_connect_s3/BlockFileWriterTestCommon.java diff --git a/pom.xml b/pom.xml index 4a97f85..ec5715e 100644 --- a/pom.xml +++ b/pom.xml @@ -83,6 +83,11 @@ aws-java-sdk-s3 ${s3.version} + + org.apache.commons + commons-compress + 1.13 + junit junit diff --git a/src/main/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriter.java b/src/main/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriter.java new file mode 100644 index 0000000..c13e50d --- /dev/null +++ b/src/main/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriter.java @@ -0,0 +1,69 @@ +package com.deviantart.kafka_connect_s3; + +import java.io.BufferedWriter; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.UnsupportedEncodingException; + +import java.util.zip.GZIPOutputStream; + +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; + +/** + * BlockBZIP2FileWriter accumulates newline delimited UTF-8 records and writes them to an + * output file that is readable by BZIP2. + * + * In fact this file is the concatenation of possibly many separate BZIP2 files corresponding to smaller chunks + * of the input. Alongside the output filename.gz file, a file filename-index.json is written containing JSON + * metadata about the size and location of each block. + * + * This allows a reading class to skip to particular line/record without decompressing whole file by looking up + * the offset of the containing block, seeking to it and beginning BZIP2 read from there. + * + * This is especially useful when the file is an archive in HTTP storage like Amazon S3 where GET request with + * range headers can allow pulling a small segment from overall compressed file. + * + * Note that thanks to BZIP2 spec, the overall file is perfectly valid and will compress as if it was a single stream + * with any regular BZIP2 decoding library or program. + */ +public class BlockBZIP2FileWriter extends BlockFileWriter { + + private BZip2CompressorOutputStream bzip2Stream; + + public BlockBZIP2FileWriter(String filenameBase, String path) throws FileNotFoundException, IOException { + this(filenameBase, path, 0, 67108864); + } + + public BlockBZIP2FileWriter(String filenameBase, String path, long firstRecordOffset) throws FileNotFoundException, IOException { + this(filenameBase, path, firstRecordOffset, 67108864); + } + + public BlockBZIP2FileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold) throws FileNotFoundException, IOException { + super(filenameBase, path, firstRecordOffset, chunkThreshold); + } + + @Override + protected void initChunkWriter() throws IOException, UnsupportedEncodingException { + bzip2Stream = new BZip2CompressorOutputStream(fileStream); + writer = new BufferedWriter(new OutputStreamWriter(bzip2Stream, "UTF-8")); + } + + @Override + protected void finishChunk() throws IOException { + Chunk ch = currentChunk(); + + // Complete GZIP block without closing stream + writer.flush(); + bzip2Stream.finish(); + + // We can no find out how long this chunk was compressed + long bytesWritten = fileStream.getNumBytesWritten(); + ch.compressedByteLength = bytesWritten - ch.byteOffset; + } + + @Override + public String getDataFileName() { + return String.format("%s-%012d.bzip2", filenameBase, super.getFirstRecordOffset()); + } +} diff --git a/src/main/java/com/deviantart/kafka_connect_s3/BlockFileWriter.java b/src/main/java/com/deviantart/kafka_connect_s3/BlockFileWriter.java index cacf92e..0f39624 100644 --- a/src/main/java/com/deviantart/kafka_connect_s3/BlockFileWriter.java +++ b/src/main/java/com/deviantart/kafka_connect_s3/BlockFileWriter.java @@ -19,7 +19,7 @@ import org.json.simple.JSONObject; abstract class BlockFileWriter { - private String filenameBase; + protected String filenameBase; private String path; protected BufferedWriter writer; protected CountingOutputStream fileStream; @@ -60,6 +60,7 @@ abstract class BlockFileWriter { abstract protected void initChunkWriter() throws IOException; abstract protected void finishChunk() throws IOException; + abstract protected String getDataFileName(); protected Chunk currentChunk() { return chunks.get(chunks.size() - 1); @@ -69,10 +70,6 @@ public long getFirstRecordOffset() { return firstRecordOffset; } - public String getDataFileName() { - return String.format("%s-%012d.gz", filenameBase, firstRecordOffset); - } - public String getIndexFileName() { return String.format("%s-%012d.index.json", filenameBase, firstRecordOffset); } diff --git a/src/main/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriter.java b/src/main/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriter.java index 8bd0c22..185bdfb 100644 --- a/src/main/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriter.java +++ b/src/main/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriter.java @@ -27,7 +27,7 @@ * metadata about the size and location of each block. * * This allows a reading class to skip to particular line/record without decompressing whole file by looking up - * the offset of the containing block, seeking to it and beginning GZIp read from there. + * the offset of the containing block, seeking to it and beginning GZIP read from there. * * This is especially useful when the file is an archive in HTTP storage like Amazon S3 where GET request with * range headers can allow pulling a small segment from overall compressed file. @@ -69,4 +69,9 @@ protected void finishChunk() throws IOException { long bytesWritten = fileStream.getNumBytesWritten(); ch.compressedByteLength = bytesWritten - ch.byteOffset; } + + @Override + public String getDataFileName() { + return String.format("%s-%012d.gz", filenameBase, super.getFirstRecordOffset()); + } } diff --git a/src/test/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriterTest.java b/src/test/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriterTest.java new file mode 100644 index 0000000..e0c1853 --- /dev/null +++ b/src/test/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriterTest.java @@ -0,0 +1,235 @@ +package com.deviantart.kafka_connect_s3; + +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileReader; +import java.io.InputStreamReader; +import java.io.RandomAccessFile; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class BlockBZIP2FileWriterTest extends BlockFileWriterTestCommon { + + private static final String tmpDirPrefix = "BlockBZIP2FileWriterTest"; + + @BeforeClass + public static void oneTimeSetUp() { + + String tempDir = System.getProperty("java.io.tmpdir"); + tmpDir = new File(tempDir, tmpDirPrefix).toString(); + + System.out.println("Temp dir for writer test is: " + tmpDir); + } + + @Test + public void testPaths() throws Exception { + BlockBZIP2FileWriter w = new BlockBZIP2FileWriter("foo", tmpDir); + assertEquals(tmpDir + "/foo-000000000000.bzip2", w.getDataFilePath()); + assertEquals(tmpDir + "/foo-000000000000.index.json", w.getIndexFilePath()); + + + BlockBZIP2FileWriter w2 = new BlockBZIP2FileWriter("foo", tmpDir, 123456); + assertEquals(tmpDir + "/foo-000000123456.bzip2", w2.getDataFilePath()); + assertEquals(tmpDir + "/foo-000000123456.index.json", w2.getIndexFilePath()); + } + + @Test + public void testWrite() throws Exception { + // Very compressible 200 byte padding string to prepend to our unique line prefix + String pad = "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + + "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"; + + // Make a writer with artificially small chunk threshold of 1kb + BlockBZIP2FileWriter w = new BlockBZIP2FileWriter("write-test", tmpDir, 987654321, 1000); + + int totalUncompressedBytes = 0; + String[] expectedLines = new String[50]; + // 50 records * 200 bytes padding should be at least 10 chunks worth + for (int i = 0; i < 50; i++) { + String line = String.format("Record %d %s", i, pad); + // Plus one for newline + totalUncompressedBytes += line.length() + 1; + // Expect to read without newlines... + expectedLines[i] = line; + // But add newlines to half the input to verify writer adds them only if needed + if (i % 2 == 0) { + line += "\n"; + } + w.write(line); + } + + assertEquals(totalUncompressedBytes, w.getTotalUncompressedSize()); + assertEquals(50, w.getNumRecords()); + assertTrue("Should be at least 10 chunks in output file", w.getNumChunks() >= 10); + + w.close(); + + verifyOutputIsSaneBZIP2File(w.getDataFilePath(), expectedLines); + verifyIndexFile(w, 987654321, expectedLines); + } + + private void verifyOutputIsSaneBZIP2File(String filename, String[] expectedRecords) throws Exception { + BZip2CompressorInputStream zip = new BZip2CompressorInputStream(new FileInputStream(filename)); + BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); + + String line; + int i = 0; + while ((line = r.readLine()) != null) { + assertTrue(String.format("Output file has more lines than expected. Expected %d lines", expectedRecords.length) + , i < expectedRecords.length); + + String expectedLine = expectedRecords[i]; + assertEquals(String.format("Output file doesn't match, first difference on line %d", i), expectedLine, line); + i++; + } + } + + private void verifyIndexFile(BlockBZIP2FileWriter w, int startOffset, String[] expectedRecords) throws Exception { + JSONParser parser = new JSONParser(); + + Object obj = parser.parse(new FileReader(w.getIndexFilePath())); + JSONObject index = (JSONObject) obj; + JSONArray chunks = (JSONArray) index.get("chunks"); + + assertEquals(w.getNumChunks(), chunks.size()); + + RandomAccessFile file = new RandomAccessFile(w.getDataFilePath(), "r"); + + // Check we can read all the chunks as individual bzip2 segments + int expectedStartOffset = startOffset; + int recordIndex = 0; + int totalBytes = 0; + int chunkIndex = 0; + for (Object chunk : chunks) { + JSONObject chunkObj = (JSONObject) chunk; + int firstOffset = (int) (long) chunkObj.get("first_record_offset"); + int numRecords = (int) (long) chunkObj.get("num_records"); + int byteOffset = (int) (long) chunkObj.get("byte_offset"); + int byteLength = (int) (long) chunkObj.get("byte_length"); + + assertEquals(expectedStartOffset, firstOffset); + assertTrue(byteLength > 0); + assertTrue(byteOffset >= 0); + + // Read just that segment of the file into byte array and attempt to parse BZIP2 + byte[] buffer = new byte[byteLength]; + file.seek(byteOffset); + int numBytesRead = file.read(buffer); + + assertEquals(buffer.length, numBytesRead); + + BZip2CompressorInputStream zip = new BZip2CompressorInputStream(new ByteArrayInputStream(buffer)); + BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); + + int numRecordsActuallyInChunk = 0; + String line; + while ((line = r.readLine()) != null) { + assertEquals(expectedRecords[recordIndex], line); + recordIndex++; + numRecordsActuallyInChunk++; + } + + assertEquals(numRecordsActuallyInChunk, numRecords); + + totalBytes += byteLength; + + expectedStartOffset = firstOffset + numRecords; + + chunkIndex++; + } + + assertEquals("All chunks should cover all bytes in the file", totalBytes, file.length()); + } + + // Hmm this test is actually not very conclusive - on OS X and most linux file systems + // it passes anyway due to nature of filesystems. Not sure how to write something more robust + // though to validate that we definitiely truncate the files even if we write less data + + @Test + public void testShouldOverwrite() throws Exception { + // Make writer and write to it a bit. + { + BlockBZIP2FileWriter w = new BlockBZIP2FileWriter("overwrite-test", tmpDir); + + // Write at least a few 4k blocks to disk so we can be sure that we don't + // only overwrite the first block. + String[] expectedLines = new String[5000]; + for (int i = 0; i < 5000; i++) { + String line = String.format("Record %d", i); + w.write(line); + expectedLines[i] = line; + } + + assertEquals(5000, w.getNumRecords()); + + w.close(); + + // Just check it actually write to disk + verifyOutputIsSaneBZIP2File(w.getDataFilePath(), expectedLines); + verifyIndexFile(w, 0, expectedLines); + + } + + { + // Now make a whole new writer for same chunk + BlockBZIP2FileWriter w = new BlockBZIP2FileWriter("overwrite-test", tmpDir); + + // Only write a few lines + String[] expectedLines2 = new String[10]; + for (int i = 0; i < 10; i++) { + String line = String.format("Overwrite record %d", i); + w.write(line); + expectedLines2[i] = line; + } + + assertEquals(10, w.getNumRecords()); + + w.close(); + + // No check output is only the 10 lines we just wrote + verifyOutputIsSaneBZIP2File(w.getDataFilePath(), expectedLines2); + verifyIndexFile(w, 0, expectedLines2); + } + } + + @Test + public void testDelete() throws Exception { + // Make writer and write to it a bit. + BlockBZIP2FileWriter w = new BlockBZIP2FileWriter("overwrite-test", tmpDir); + + String[] expectedLines = new String[5000]; + for (int i = 0; i < 5000; i++) { + String line = String.format("Record %d", i); + w.write(line); + expectedLines[i] = line; + } + + assertEquals(5000, w.getNumRecords()); + + w.close(); + + // Just check it actually write to disk + verifyOutputIsSaneBZIP2File(w.getDataFilePath(), expectedLines); + verifyIndexFile(w, 0, expectedLines); + + // Now remove it + w.delete(); + + File dataF = new File(w.getDataFilePath()); + File idxF = new File(w.getIndexFilePath()); + + assertFalse("Data file should not exist after delete", dataF.exists()); + assertFalse("Index file should not exist after delete", idxF.exists()); + } +} diff --git a/src/test/java/com/deviantart/kafka_connect_s3/BlockFileWriterTestCommon.java b/src/test/java/com/deviantart/kafka_connect_s3/BlockFileWriterTestCommon.java new file mode 100644 index 0000000..cf04aee --- /dev/null +++ b/src/test/java/com/deviantart/kafka_connect_s3/BlockFileWriterTestCommon.java @@ -0,0 +1,21 @@ +package com.deviantart.kafka_connect_s3; + +import org.junit.Before; + +import java.io.File; + +public abstract class BlockFileWriterTestCommon { + + protected Class compressedFileWriterClass; + + protected static String tmpDir; + + @Before + public void setUp() throws Exception { + File f = new File(tmpDir); + + if (!f.exists()) { + f.mkdir(); + } + } +} diff --git a/src/test/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriterTest.java b/src/test/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriterTest.java index 55f4389..ca54181 100644 --- a/src/test/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriterTest.java +++ b/src/test/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriterTest.java @@ -1,9 +1,5 @@ package com.deviantart.kafka_connect_s3; -import junit.framework.Test; -import junit.framework.TestCase; -import junit.framework.TestSuite; - import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.File; @@ -16,230 +12,225 @@ import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; -public class BlockGZIPFileWriterTest extends TestCase { - - private String tmpDirPrefix = "BlockGZIPFileWriterTest"; - private String tmpDir; - - public BlockGZIPFileWriterTest(String testName) { - super(testName); +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; - String tempDir = System.getProperty("java.io.tmpdir"); - this.tmpDir = new File(tempDir, tmpDirPrefix).toString(); +public class BlockGZIPFileWriterTest extends BlockFileWriterTestCommon { - System.out.println("Temp dir for writer test is: " + tmpDir); - } + private static final String tmpDirPrefix = "BlockGZIPFileWriterTest"; - /** - * @return the suite of tests being tested - */ - public static Test suite() { - return new TestSuite(BlockGZIPFileWriterTest.class); - } + @BeforeClass + public static void oneTimeSetUp() { - @Override - protected void setUp() throws Exception { - File f = new File(tmpDir); + String tempDir = System.getProperty("java.io.tmpdir"); + tmpDir = new File(tempDir, tmpDirPrefix).toString(); - if (!f.exists()) { - f.mkdir(); - } - } - - public void testPaths() throws Exception { - BlockGZIPFileWriter w = new BlockGZIPFileWriter("foo", tmpDir); - assertEquals(tmpDir + "/foo-000000000000.gz", w.getDataFilePath()); - assertEquals(tmpDir + "/foo-000000000000.index.json", w.getIndexFilePath()); - - - BlockGZIPFileWriter w2 = new BlockGZIPFileWriter("foo", tmpDir, 123456); - assertEquals(tmpDir + "/foo-000000123456.gz", w2.getDataFilePath()); - assertEquals(tmpDir + "/foo-000000123456.index.json", w2.getIndexFilePath()); - } - - public void testWrite() throws Exception { - // Very compressible 200 byte padding string to prepend to our unique line prefix - String pad = "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" - + "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"; - - // Make a writer with artificially small chunk threshold of 1kb - BlockGZIPFileWriter w = new BlockGZIPFileWriter("write-test", tmpDir, 987654321, 1000); - - int totalUncompressedBytes = 0; - String[] expectedLines = new String[50]; - // 50 records * 200 bytes padding should be at least 10 chunks worth - for (int i = 0; i < 50; i++) { - String line = String.format("Record %d %s", i, pad); - // Plus one for newline - totalUncompressedBytes += line.length() + 1; - // Expect to read without newlines... - expectedLines[i] = line; - // But add newlines to half the input to verify writer adds them only if needed - if (i % 2 == 0) { - line += "\n"; - } - w.write(line); + System.out.println("Temp dir for writer test is: " + tmpDir); } - assertEquals(totalUncompressedBytes, w.getTotalUncompressedSize()); - assertEquals(50, w.getNumRecords()); - assertTrue("Should be at least 10 chunks in output file", w.getNumChunks() >= 10); + @Test + public void testPaths() throws Exception { + BlockGZIPFileWriter w = new BlockGZIPFileWriter("foo", tmpDir); + assertEquals(tmpDir + "/foo-000000000000.gz", w.getDataFilePath()); + assertEquals(tmpDir + "/foo-000000000000.index.json", w.getIndexFilePath()); - w.close(); - verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines); - verifyIndexFile(w, 987654321, expectedLines); - } + BlockGZIPFileWriter w2 = new BlockGZIPFileWriter("foo", tmpDir, 123456); + assertEquals(tmpDir + "/foo-000000123456.gz", w2.getDataFilePath()); + assertEquals(tmpDir + "/foo-000000123456.index.json", w2.getIndexFilePath()); + } - private void verifyOutputIsSaneGZIPFile(String filename, String[] expectedRecords) throws Exception { - GZIPInputStream zip = new GZIPInputStream(new FileInputStream(filename)); - BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); + @Test + public void testWrite() throws Exception { + // Very compressible 200 byte padding string to prepend to our unique line prefix + String pad = "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + + "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"; + + // Make a writer with artificially small chunk threshold of 1kb + BlockGZIPFileWriter w = new BlockGZIPFileWriter("write-test", tmpDir, 987654321, 1000); + + int totalUncompressedBytes = 0; + String[] expectedLines = new String[50]; + // 50 records * 200 bytes padding should be at least 10 chunks worth + for (int i = 0; i < 50; i++) { + String line = String.format("Record %d %s", i, pad); + // Plus one for newline + totalUncompressedBytes += line.length() + 1; + // Expect to read without newlines... + expectedLines[i] = line; + // But add newlines to half the input to verify writer adds them only if needed + if (i % 2 == 0) { + line += "\n"; + } + w.write(line); + } + + assertEquals(totalUncompressedBytes, w.getTotalUncompressedSize()); + assertEquals(50, w.getNumRecords()); + assertTrue("Should be at least 10 chunks in output file", w.getNumChunks() >= 10); + + w.close(); + + verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines); + verifyIndexFile(w, 987654321, expectedLines); + } - String line; - int i = 0; - while ((line = r.readLine()) != null) { - assertTrue( String.format("Output file has more lines than expected. Expected %d lines", expectedRecords.length) - , i < expectedRecords.length); + private void verifyOutputIsSaneGZIPFile(String filename, String[] expectedRecords) throws Exception { + GZIPInputStream zip = new GZIPInputStream(new FileInputStream(filename)); + BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); - String expectedLine = expectedRecords[i]; - assertEquals(String.format("Output file doesn't match, first difference on line %d", i), expectedLine, line); - i++; + String line; + int i = 0; + while ((line = r.readLine()) != null) { + assertTrue(String.format("Output file has more lines than expected. Expected %d lines", expectedRecords.length) + , i < expectedRecords.length); + + String expectedLine = expectedRecords[i]; + assertEquals(String.format("Output file doesn't match, first difference on line %d", i), expectedLine, line); + i++; + } } - } - private void verifyIndexFile(BlockGZIPFileWriter w, int startOffset, String[] expectedRecords) throws Exception { - JSONParser parser = new JSONParser(); + private void verifyIndexFile(BlockGZIPFileWriter w, int startOffset, String[] expectedRecords) throws Exception { + JSONParser parser = new JSONParser(); + + Object obj = parser.parse(new FileReader(w.getIndexFilePath())); + JSONObject index = (JSONObject) obj; + JSONArray chunks = (JSONArray) index.get("chunks"); - Object obj = parser.parse(new FileReader(w.getIndexFilePath())); - JSONObject index = (JSONObject) obj; - JSONArray chunks = (JSONArray) index.get("chunks"); + assertEquals(w.getNumChunks(), chunks.size()); - assertEquals(w.getNumChunks(), chunks.size()); + RandomAccessFile file = new RandomAccessFile(w.getDataFilePath(), "r"); - RandomAccessFile file = new RandomAccessFile(w.getDataFilePath(), "r"); + // Check we can read all the chunks as individual gzip segments + int expectedStartOffset = startOffset; + int recordIndex = 0; + int totalBytes = 0; + int chunkIndex = 0; + for (Object chunk : chunks) { + JSONObject chunkObj = (JSONObject) chunk; + int firstOffset = (int) (long) chunkObj.get("first_record_offset"); + int numRecords = (int) (long) chunkObj.get("num_records"); + int byteOffset = (int) (long) chunkObj.get("byte_offset"); + int byteLength = (int) (long) chunkObj.get("byte_length"); - // Check we can read all the chunks as individual gzip segments - int expectedStartOffset = startOffset; - int recordIndex = 0; - int totalBytes = 0; - int chunkIndex = 0; - for (Object chunk : chunks) { - JSONObject chunkObj = (JSONObject) chunk; - int firstOffset = (int)(long) chunkObj.get("first_record_offset"); - int numRecords = (int)(long) chunkObj.get("num_records"); - int byteOffset = (int)(long) chunkObj.get("byte_offset"); - int byteLength = (int)(long) chunkObj.get("byte_length"); + assertEquals(expectedStartOffset, firstOffset); + assertTrue(byteLength > 0); + assertTrue(byteOffset >= 0); - assertEquals(expectedStartOffset, firstOffset); - assertTrue(byteLength > 0); - assertTrue(byteOffset >= 0); + // Read just that segment of the file into byte array and attempt to parse GZIP + byte[] buffer = new byte[byteLength]; + file.seek(byteOffset); + int numBytesRead = file.read(buffer); - // Read just that segment of the file into byte array and attempt to parse GZIP - byte[] buffer = new byte[byteLength]; - file.seek(byteOffset); - int numBytesRead = file.read(buffer); + assertEquals(buffer.length, numBytesRead); - assertEquals(buffer.length, numBytesRead); + GZIPInputStream zip = new GZIPInputStream(new ByteArrayInputStream(buffer)); + BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); - GZIPInputStream zip = new GZIPInputStream(new ByteArrayInputStream(buffer)); - BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); + int numRecordsActuallyInChunk = 0; + String line; + while ((line = r.readLine()) != null) { + assertEquals(expectedRecords[recordIndex], line); + recordIndex++; + numRecordsActuallyInChunk++; + } - int numRecordsActuallyInChunk = 0; - String line; - while ((line = r.readLine()) != null) { - assertEquals(expectedRecords[recordIndex], line); - recordIndex++; - numRecordsActuallyInChunk++; - } + assertEquals(numRecordsActuallyInChunk, numRecords); - assertEquals(numRecordsActuallyInChunk, numRecords); + totalBytes += byteLength; - totalBytes += byteLength; + expectedStartOffset = firstOffset + numRecords; - expectedStartOffset = firstOffset + numRecords; + chunkIndex++; + } - chunkIndex++; + assertEquals("All chunks should cover all bytes in the file", totalBytes, file.length()); } - assertEquals("All chunks should cover all bytes in the file", totalBytes, file.length()); - } + // Hmm this test is actually not very conclusive - on OS X and most linux file systems + // it passes anyway due to nature of filesystems. Not sure how to write something more robust + // though to validate that we definitiely truncate the files even if we write less data - // Hmm this test is actually not very conclusive - on OS X and most linux file systems - // it passes anyway due to nature of filesystems. Not sure how to write something more robust - // though to validate that we definitiely truncate the files even if we write less data - public void testShouldOverwrite() throws Exception { - // Make writer and write to it a bit. - { - BlockGZIPFileWriter w = new BlockGZIPFileWriter("overwrite-test", tmpDir); + @Test + public void testShouldOverwrite() throws Exception { + // Make writer and write to it a bit. + { + BlockGZIPFileWriter w = new BlockGZIPFileWriter("overwrite-test", tmpDir); - // Write at least a few 4k blocks to disk so we can be sure that we don't - // only overwrite the first block. - String[] expectedLines = new String[5000]; - for (int i = 0; i < 5000; i++) { - String line = String.format("Record %d", i); - w.write(line); - expectedLines[i] = line; - } + // Write at least a few 4k blocks to disk so we can be sure that we don't + // only overwrite the first block. + String[] expectedLines = new String[5000]; + for (int i = 0; i < 5000; i++) { + String line = String.format("Record %d", i); + w.write(line); + expectedLines[i] = line; + } - assertEquals(5000, w.getNumRecords()); + assertEquals(5000, w.getNumRecords()); - w.close(); + w.close(); - // Just check it actually write to disk - verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines); - verifyIndexFile(w, 0, expectedLines); + // Just check it actually write to disk + verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines); + verifyIndexFile(w, 0, expectedLines); - } + } - { - // Now make a whole new writer for same chunk - BlockGZIPFileWriter w = new BlockGZIPFileWriter("overwrite-test", tmpDir); + { + // Now make a whole new writer for same chunk + BlockGZIPFileWriter w = new BlockGZIPFileWriter("overwrite-test", tmpDir); - // Only write a few lines - String[] expectedLines2 = new String[10]; - for (int i = 0; i < 10; i++) { - String line = String.format("Overwrite record %d", i); - w.write(line); - expectedLines2[i] = line; - } + // Only write a few lines + String[] expectedLines2 = new String[10]; + for (int i = 0; i < 10; i++) { + String line = String.format("Overwrite record %d", i); + w.write(line); + expectedLines2[i] = line; + } - assertEquals(10, w.getNumRecords()); + assertEquals(10, w.getNumRecords()); - w.close(); + w.close(); - // No check output is only the 10 lines we just wrote - verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines2); - verifyIndexFile(w, 0, expectedLines2); + // No check output is only the 10 lines we just wrote + verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines2); + verifyIndexFile(w, 0, expectedLines2); + } } - } - public void testDelete() throws Exception { - // Make writer and write to it a bit. - BlockGZIPFileWriter w = new BlockGZIPFileWriter("overwrite-test", tmpDir); + @Test + public void testDelete() throws Exception { + // Make writer and write to it a bit. + BlockGZIPFileWriter w = new BlockGZIPFileWriter("overwrite-test", tmpDir); - String[] expectedLines = new String[5000]; - for (int i = 0; i < 5000; i++) { - String line = String.format("Record %d", i); - w.write(line); - expectedLines[i] = line; - } + String[] expectedLines = new String[5000]; + for (int i = 0; i < 5000; i++) { + String line = String.format("Record %d", i); + w.write(line); + expectedLines[i] = line; + } - assertEquals(5000, w.getNumRecords()); + assertEquals(5000, w.getNumRecords()); - w.close(); + w.close(); - // Just check it actually write to disk - verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines); - verifyIndexFile(w, 0, expectedLines); + // Just check it actually write to disk + verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines); + verifyIndexFile(w, 0, expectedLines); - // Now remove it - w.delete(); + // Now remove it + w.delete(); - File dataF = new File(w.getDataFilePath()); - File idxF = new File(w.getIndexFilePath()); + File dataF = new File(w.getDataFilePath()); + File idxF = new File(w.getIndexFilePath()); - assertFalse("Data file should not exist after delete", dataF.exists()); - assertFalse("Index file should not exist after delete", idxF.exists()); - } + assertFalse("Data file should not exist after delete", dataF.exists()); + assertFalse("Index file should not exist after delete", idxF.exists()); + } } From 4fa1a654bad38fcdf9d68c57666efe2e9da33ed5 Mon Sep 17 00:00:00 2001 From: R Vadai Date: Mon, 3 Apr 2017 12:01:13 +0100 Subject: [PATCH 04/11] Refactored tests --- .../BlockBZIP2FileWriterTest.java | 119 ++++------------ .../BlockFileWriterTestCommon.java | 94 ++++++++++++- .../BlockGZIPFileWriterTest.java | 130 +++++------------- 3 files changed, 156 insertions(+), 187 deletions(-) diff --git a/src/test/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriterTest.java b/src/test/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriterTest.java index e0c1853..263d94d 100644 --- a/src/test/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriterTest.java +++ b/src/test/java/com/deviantart/kafka_connect_s3/BlockBZIP2FileWriterTest.java @@ -1,19 +1,12 @@ package com.deviantart.kafka_connect_s3; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; -import org.json.simple.JSONArray; -import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; import org.junit.BeforeClass; import org.junit.Test; -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; import java.io.File; -import java.io.FileInputStream; -import java.io.FileReader; -import java.io.InputStreamReader; -import java.io.RandomAccessFile; +import java.io.IOException; +import java.io.InputStream; import static junit.framework.TestCase.assertTrue; import static org.junit.Assert.assertEquals; @@ -32,14 +25,33 @@ public static void oneTimeSetUp() { System.out.println("Temp dir for writer test is: " + tmpDir); } + @Override + protected BlockFileWriter newBlockFileWriter(String filenameBase, String path) throws IOException { + return new BlockBZIP2FileWriter(filenameBase, path); + } + + @Override + protected BlockFileWriter newBlockFileWriter(String filenameBase, String path, long firstRecordOffset) throws IOException { + return new BlockBZIP2FileWriter(filenameBase, path, firstRecordOffset); + } + + @Override + protected BlockFileWriter newBlockFileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold) throws IOException { + return new BlockBZIP2FileWriter(filenameBase, path, firstRecordOffset, chunkThreshold); + } + + protected InputStream newCompressorInputStream(InputStream in) throws IOException { + return new BZip2CompressorInputStream(in); + } + @Test public void testPaths() throws Exception { - BlockBZIP2FileWriter w = new BlockBZIP2FileWriter("foo", tmpDir); + BlockFileWriter w = newBlockFileWriter("foo", tmpDir); assertEquals(tmpDir + "/foo-000000000000.bzip2", w.getDataFilePath()); assertEquals(tmpDir + "/foo-000000000000.index.json", w.getIndexFilePath()); - BlockBZIP2FileWriter w2 = new BlockBZIP2FileWriter("foo", tmpDir, 123456); + BlockFileWriter w2 = newBlockFileWriter("foo", tmpDir, 123456); assertEquals(tmpDir + "/foo-000000123456.bzip2", w2.getDataFilePath()); assertEquals(tmpDir + "/foo-000000123456.index.json", w2.getIndexFilePath()); } @@ -51,7 +63,7 @@ public void testWrite() throws Exception { + "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"; // Make a writer with artificially small chunk threshold of 1kb - BlockBZIP2FileWriter w = new BlockBZIP2FileWriter("write-test", tmpDir, 987654321, 1000); + BlockFileWriter w = newBlockFileWriter("write-test", tmpDir, 987654321, 1000); int totalUncompressedBytes = 0; String[] expectedLines = new String[50]; @@ -75,82 +87,11 @@ public void testWrite() throws Exception { w.close(); - verifyOutputIsSaneBZIP2File(w.getDataFilePath(), expectedLines); + verifyOutputIsSaneCompressedFile(w.getDataFilePath(), expectedLines); verifyIndexFile(w, 987654321, expectedLines); } - private void verifyOutputIsSaneBZIP2File(String filename, String[] expectedRecords) throws Exception { - BZip2CompressorInputStream zip = new BZip2CompressorInputStream(new FileInputStream(filename)); - BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); - - String line; - int i = 0; - while ((line = r.readLine()) != null) { - assertTrue(String.format("Output file has more lines than expected. Expected %d lines", expectedRecords.length) - , i < expectedRecords.length); - String expectedLine = expectedRecords[i]; - assertEquals(String.format("Output file doesn't match, first difference on line %d", i), expectedLine, line); - i++; - } - } - - private void verifyIndexFile(BlockBZIP2FileWriter w, int startOffset, String[] expectedRecords) throws Exception { - JSONParser parser = new JSONParser(); - - Object obj = parser.parse(new FileReader(w.getIndexFilePath())); - JSONObject index = (JSONObject) obj; - JSONArray chunks = (JSONArray) index.get("chunks"); - - assertEquals(w.getNumChunks(), chunks.size()); - - RandomAccessFile file = new RandomAccessFile(w.getDataFilePath(), "r"); - - // Check we can read all the chunks as individual bzip2 segments - int expectedStartOffset = startOffset; - int recordIndex = 0; - int totalBytes = 0; - int chunkIndex = 0; - for (Object chunk : chunks) { - JSONObject chunkObj = (JSONObject) chunk; - int firstOffset = (int) (long) chunkObj.get("first_record_offset"); - int numRecords = (int) (long) chunkObj.get("num_records"); - int byteOffset = (int) (long) chunkObj.get("byte_offset"); - int byteLength = (int) (long) chunkObj.get("byte_length"); - - assertEquals(expectedStartOffset, firstOffset); - assertTrue(byteLength > 0); - assertTrue(byteOffset >= 0); - - // Read just that segment of the file into byte array and attempt to parse BZIP2 - byte[] buffer = new byte[byteLength]; - file.seek(byteOffset); - int numBytesRead = file.read(buffer); - - assertEquals(buffer.length, numBytesRead); - - BZip2CompressorInputStream zip = new BZip2CompressorInputStream(new ByteArrayInputStream(buffer)); - BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); - - int numRecordsActuallyInChunk = 0; - String line; - while ((line = r.readLine()) != null) { - assertEquals(expectedRecords[recordIndex], line); - recordIndex++; - numRecordsActuallyInChunk++; - } - - assertEquals(numRecordsActuallyInChunk, numRecords); - - totalBytes += byteLength; - - expectedStartOffset = firstOffset + numRecords; - - chunkIndex++; - } - - assertEquals("All chunks should cover all bytes in the file", totalBytes, file.length()); - } // Hmm this test is actually not very conclusive - on OS X and most linux file systems // it passes anyway due to nature of filesystems. Not sure how to write something more robust @@ -176,14 +117,14 @@ public void testShouldOverwrite() throws Exception { w.close(); // Just check it actually write to disk - verifyOutputIsSaneBZIP2File(w.getDataFilePath(), expectedLines); + verifyOutputIsSaneCompressedFile(w.getDataFilePath(), expectedLines); verifyIndexFile(w, 0, expectedLines); } { // Now make a whole new writer for same chunk - BlockBZIP2FileWriter w = new BlockBZIP2FileWriter("overwrite-test", tmpDir); + BlockFileWriter w = newBlockFileWriter("overwrite-test", tmpDir); // Only write a few lines String[] expectedLines2 = new String[10]; @@ -198,7 +139,7 @@ public void testShouldOverwrite() throws Exception { w.close(); // No check output is only the 10 lines we just wrote - verifyOutputIsSaneBZIP2File(w.getDataFilePath(), expectedLines2); + verifyOutputIsSaneCompressedFile(w.getDataFilePath(), expectedLines2); verifyIndexFile(w, 0, expectedLines2); } } @@ -206,7 +147,7 @@ public void testShouldOverwrite() throws Exception { @Test public void testDelete() throws Exception { // Make writer and write to it a bit. - BlockBZIP2FileWriter w = new BlockBZIP2FileWriter("overwrite-test", tmpDir); + BlockFileWriter w = newBlockFileWriter("overwrite-test", tmpDir); String[] expectedLines = new String[5000]; for (int i = 0; i < 5000; i++) { @@ -220,7 +161,7 @@ public void testDelete() throws Exception { w.close(); // Just check it actually write to disk - verifyOutputIsSaneBZIP2File(w.getDataFilePath(), expectedLines); + verifyOutputIsSaneCompressedFile(w.getDataFilePath(), expectedLines); verifyIndexFile(w, 0, expectedLines); // Now remove it diff --git a/src/test/java/com/deviantart/kafka_connect_s3/BlockFileWriterTestCommon.java b/src/test/java/com/deviantart/kafka_connect_s3/BlockFileWriterTestCommon.java index cf04aee..ffb0950 100644 --- a/src/test/java/com/deviantart/kafka_connect_s3/BlockFileWriterTestCommon.java +++ b/src/test/java/com/deviantart/kafka_connect_s3/BlockFileWriterTestCommon.java @@ -1,12 +1,24 @@ package com.deviantart.kafka_connect_s3; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; import org.junit.Before; +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; import java.io.File; +import java.io.FileInputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.RandomAccessFile; -public abstract class BlockFileWriterTestCommon { +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; - protected Class compressedFileWriterClass; +public abstract class BlockFileWriterTestCommon { protected static String tmpDir; @@ -18,4 +30,82 @@ public void setUp() throws Exception { f.mkdir(); } } + + abstract protected BlockFileWriter newBlockFileWriter(String filenameBase, String path) throws IOException; + abstract protected BlockFileWriter newBlockFileWriter(String filenameBase, String path, long firstRecordOffset) throws IOException; + abstract protected BlockFileWriter newBlockFileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold) throws IOException; + abstract protected InputStream newCompressorInputStream(InputStream in) throws IOException; + + protected void verifyOutputIsSaneCompressedFile(String filename, String[] expectedRecords) throws Exception { + InputStream zip = newCompressorInputStream(new FileInputStream(filename)); + BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); + + String line; + int i = 0; + while ((line = r.readLine()) != null) { + assertTrue(String.format("Output file has more lines than expected. Expected %d lines", expectedRecords.length) + , i < expectedRecords.length); + + String expectedLine = expectedRecords[i]; + assertEquals(String.format("Output file doesn't match, first difference on line %d", i), expectedLine, line); + i++; + } + } + + protected void verifyIndexFile(BlockFileWriter w, int startOffset, String[] expectedRecords) throws Exception { + JSONParser parser = new JSONParser(); + + Object obj = parser.parse(new FileReader(w.getIndexFilePath())); + JSONObject index = (JSONObject) obj; + JSONArray chunks = (JSONArray) index.get("chunks"); + + assertEquals(w.getNumChunks(), chunks.size()); + + RandomAccessFile file = new RandomAccessFile(w.getDataFilePath(), "r"); + + // Check we can read all the chunks as individual bzip2 segments + int expectedStartOffset = startOffset; + int recordIndex = 0; + int totalBytes = 0; + int chunkIndex = 0; + for (Object chunk : chunks) { + JSONObject chunkObj = (JSONObject) chunk; + int firstOffset = (int) (long) chunkObj.get("first_record_offset"); + int numRecords = (int) (long) chunkObj.get("num_records"); + int byteOffset = (int) (long) chunkObj.get("byte_offset"); + int byteLength = (int) (long) chunkObj.get("byte_length"); + + assertEquals(expectedStartOffset, firstOffset); + assertTrue(byteLength > 0); + assertTrue(byteOffset >= 0); + + // Read just that segment of the file into byte array and attempt to parse BZIP2 + byte[] buffer = new byte[byteLength]; + file.seek(byteOffset); + int numBytesRead = file.read(buffer); + + assertEquals(buffer.length, numBytesRead); + + InputStream zip = newCompressorInputStream(new ByteArrayInputStream(buffer)); + BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); + + int numRecordsActuallyInChunk = 0; + String line; + while ((line = r.readLine()) != null) { + assertEquals(expectedRecords[recordIndex], line); + recordIndex++; + numRecordsActuallyInChunk++; + } + + assertEquals(numRecordsActuallyInChunk, numRecords); + + totalBytes += byteLength; + + expectedStartOffset = firstOffset + numRecords; + + chunkIndex++; + } + + assertEquals("All chunks should cover all bytes in the file", totalBytes, file.length()); + } } diff --git a/src/test/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriterTest.java b/src/test/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriterTest.java index ca54181..28820c8 100644 --- a/src/test/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriterTest.java +++ b/src/test/java/com/deviantart/kafka_connect_s3/BlockGZIPFileWriterTest.java @@ -1,21 +1,13 @@ package com.deviantart.kafka_connect_s3; -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileReader; -import java.io.InputStreamReader; -import java.io.RandomAccessFile; -import java.util.zip.GZIPInputStream; - -import org.json.simple.JSONArray; -import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.GZIPInputStream; + import static junit.framework.TestCase.assertTrue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -33,14 +25,33 @@ public static void oneTimeSetUp() { System.out.println("Temp dir for writer test is: " + tmpDir); } + @Override + protected BlockFileWriter newBlockFileWriter(String filenameBase, String path) throws IOException { + return new BlockGZIPFileWriter(filenameBase, path); + } + + @Override + protected BlockFileWriter newBlockFileWriter(String filenameBase, String path, long firstRecordOffset) throws IOException { + return new BlockGZIPFileWriter(filenameBase, path, firstRecordOffset); + } + + @Override + protected BlockFileWriter newBlockFileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold) throws IOException { + return new BlockGZIPFileWriter(filenameBase, path, firstRecordOffset, chunkThreshold); + } + + protected InputStream newCompressorInputStream(InputStream in) throws IOException { + return new GZIPInputStream(in); + } + @Test public void testPaths() throws Exception { - BlockGZIPFileWriter w = new BlockGZIPFileWriter("foo", tmpDir); + BlockFileWriter w = newBlockFileWriter("foo", tmpDir); assertEquals(tmpDir + "/foo-000000000000.gz", w.getDataFilePath()); assertEquals(tmpDir + "/foo-000000000000.index.json", w.getIndexFilePath()); - BlockGZIPFileWriter w2 = new BlockGZIPFileWriter("foo", tmpDir, 123456); + BlockFileWriter w2 = newBlockFileWriter("foo", tmpDir, 123456); assertEquals(tmpDir + "/foo-000000123456.gz", w2.getDataFilePath()); assertEquals(tmpDir + "/foo-000000123456.index.json", w2.getIndexFilePath()); } @@ -52,7 +63,7 @@ public void testWrite() throws Exception { + "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"; // Make a writer with artificially small chunk threshold of 1kb - BlockGZIPFileWriter w = new BlockGZIPFileWriter("write-test", tmpDir, 987654321, 1000); + BlockFileWriter w = newBlockFileWriter("write-test", tmpDir, 987654321, 1000); int totalUncompressedBytes = 0; String[] expectedLines = new String[50]; @@ -76,83 +87,10 @@ public void testWrite() throws Exception { w.close(); - verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines); + verifyOutputIsSaneCompressedFile(w.getDataFilePath(), expectedLines); verifyIndexFile(w, 987654321, expectedLines); } - private void verifyOutputIsSaneGZIPFile(String filename, String[] expectedRecords) throws Exception { - GZIPInputStream zip = new GZIPInputStream(new FileInputStream(filename)); - BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); - - String line; - int i = 0; - while ((line = r.readLine()) != null) { - assertTrue(String.format("Output file has more lines than expected. Expected %d lines", expectedRecords.length) - , i < expectedRecords.length); - - String expectedLine = expectedRecords[i]; - assertEquals(String.format("Output file doesn't match, first difference on line %d", i), expectedLine, line); - i++; - } - } - - private void verifyIndexFile(BlockGZIPFileWriter w, int startOffset, String[] expectedRecords) throws Exception { - JSONParser parser = new JSONParser(); - - Object obj = parser.parse(new FileReader(w.getIndexFilePath())); - JSONObject index = (JSONObject) obj; - JSONArray chunks = (JSONArray) index.get("chunks"); - - assertEquals(w.getNumChunks(), chunks.size()); - - RandomAccessFile file = new RandomAccessFile(w.getDataFilePath(), "r"); - - // Check we can read all the chunks as individual gzip segments - int expectedStartOffset = startOffset; - int recordIndex = 0; - int totalBytes = 0; - int chunkIndex = 0; - for (Object chunk : chunks) { - JSONObject chunkObj = (JSONObject) chunk; - int firstOffset = (int) (long) chunkObj.get("first_record_offset"); - int numRecords = (int) (long) chunkObj.get("num_records"); - int byteOffset = (int) (long) chunkObj.get("byte_offset"); - int byteLength = (int) (long) chunkObj.get("byte_length"); - - assertEquals(expectedStartOffset, firstOffset); - assertTrue(byteLength > 0); - assertTrue(byteOffset >= 0); - - // Read just that segment of the file into byte array and attempt to parse GZIP - byte[] buffer = new byte[byteLength]; - file.seek(byteOffset); - int numBytesRead = file.read(buffer); - - assertEquals(buffer.length, numBytesRead); - - GZIPInputStream zip = new GZIPInputStream(new ByteArrayInputStream(buffer)); - BufferedReader r = new BufferedReader(new InputStreamReader(zip, "UTF-8")); - - int numRecordsActuallyInChunk = 0; - String line; - while ((line = r.readLine()) != null) { - assertEquals(expectedRecords[recordIndex], line); - recordIndex++; - numRecordsActuallyInChunk++; - } - - assertEquals(numRecordsActuallyInChunk, numRecords); - - totalBytes += byteLength; - - expectedStartOffset = firstOffset + numRecords; - - chunkIndex++; - } - - assertEquals("All chunks should cover all bytes in the file", totalBytes, file.length()); - } - // Hmm this test is actually not very conclusive - on OS X and most linux file systems // it passes anyway due to nature of filesystems. Not sure how to write something more robust // though to validate that we definitiely truncate the files even if we write less data @@ -161,7 +99,7 @@ private void verifyIndexFile(BlockGZIPFileWriter w, int startOffset, String[] ex public void testShouldOverwrite() throws Exception { // Make writer and write to it a bit. { - BlockGZIPFileWriter w = new BlockGZIPFileWriter("overwrite-test", tmpDir); + BlockFileWriter w = newBlockFileWriter("overwrite-test", tmpDir); // Write at least a few 4k blocks to disk so we can be sure that we don't // only overwrite the first block. @@ -177,14 +115,14 @@ public void testShouldOverwrite() throws Exception { w.close(); // Just check it actually write to disk - verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines); + verifyOutputIsSaneCompressedFile(w.getDataFilePath(), expectedLines); verifyIndexFile(w, 0, expectedLines); } { // Now make a whole new writer for same chunk - BlockGZIPFileWriter w = new BlockGZIPFileWriter("overwrite-test", tmpDir); + BlockFileWriter w = newBlockFileWriter("overwrite-test", tmpDir); // Only write a few lines String[] expectedLines2 = new String[10]; @@ -199,7 +137,7 @@ public void testShouldOverwrite() throws Exception { w.close(); // No check output is only the 10 lines we just wrote - verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines2); + verifyOutputIsSaneCompressedFile(w.getDataFilePath(), expectedLines2); verifyIndexFile(w, 0, expectedLines2); } } @@ -207,7 +145,7 @@ public void testShouldOverwrite() throws Exception { @Test public void testDelete() throws Exception { // Make writer and write to it a bit. - BlockGZIPFileWriter w = new BlockGZIPFileWriter("overwrite-test", tmpDir); + BlockFileWriter w = newBlockFileWriter("overwrite-test", tmpDir); String[] expectedLines = new String[5000]; for (int i = 0; i < 5000; i++) { @@ -221,7 +159,7 @@ public void testDelete() throws Exception { w.close(); // Just check it actually write to disk - verifyOutputIsSaneGZIPFile(w.getDataFilePath(), expectedLines); + verifyOutputIsSaneCompressedFile(w.getDataFilePath(), expectedLines); verifyIndexFile(w, 0, expectedLines); // Now remove it From fbc4803d45983ad3191814cf264db84520f1c856 Mon Sep 17 00:00:00 2001 From: R Vadai Date: Mon, 3 Apr 2017 12:12:05 +0100 Subject: [PATCH 05/11] Added compression type config --- .../kafka_connect_s3/S3SinkConnector.java | 14 +++++++---- .../kafka_connect_s3/S3SinkTask.java | 24 ++++++++++++------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/deviantart/kafka_connect_s3/S3SinkConnector.java b/src/main/java/com/deviantart/kafka_connect_s3/S3SinkConnector.java index 649c8b3..385e18f 100644 --- a/src/main/java/com/deviantart/kafka_connect_s3/S3SinkConnector.java +++ b/src/main/java/com/deviantart/kafka_connect_s3/S3SinkConnector.java @@ -36,13 +36,16 @@ public class S3SinkConnector extends SinkConnector { public static final String BUFFER_DIRECTORY_PATH_CONFIG = "local.buffer.dir"; + public static final String COMPRESSION_TYPE = "compression.type"; + public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(MAX_BLOCK_SIZE_CONFIG, Type.LONG, DEFAULT_MAX_BLOCK_SIZE, Range.atLeast(0), Importance.LOW, "Maximum size of data chunks in bytes (before compression)") + .define(MAX_BLOCK_SIZE_CONFIG, Type.LONG, DEFAULT_MAX_BLOCK_SIZE, Range.atLeast(0), Importance.LOW, "Maximum size of data chunks in bytes (before compression).") .define(S3_BUCKET_CONFIG, Type.STRING, Importance.HIGH, "Name of the S3 bucket") - .define(S3_PREFIX_CONFIG, Type.STRING, "", Importance.HIGH, "Path prefix of files to be written to S3") - .define(OVERRIDE_S3_ENDPOINT_CONFIG, Type.STRING, "", Importance.LOW, "Override the S3 URL endpoint") - .define(S3_PATHSTYLE_CONFIG, Type.BOOLEAN, false, Importance.LOW, "Override the standard S3 URL style by placing the bucket name in the path instead of hostname") - .define(BUFFER_DIRECTORY_PATH_CONFIG, Type.STRING, Importance.HIGH, "Path to directory to store data temporarily before uploading to S3") + .define(S3_PREFIX_CONFIG, Type.STRING, "", Importance.HIGH, "Path prefix of files to be written to S3.") + .define(OVERRIDE_S3_ENDPOINT_CONFIG, Type.STRING, "", Importance.LOW, "Override the S3 URL endpoint.") + .define(S3_PATHSTYLE_CONFIG, Type.BOOLEAN, false, Importance.LOW, "Override the standard S3 URL style by placing the bucket name in the path instead of hostname.") + .define(BUFFER_DIRECTORY_PATH_CONFIG, Type.STRING, Importance.HIGH, "Path to directory to store data temporarily before uploading to S3.") + .define(COMPRESSION_TYPE, Type.STRING, "gzip", Importance.HIGH, "The compression type to use, gzip and bzip2 supported. Defaults to gzip.") ; private Map configProperties; @@ -73,6 +76,7 @@ public List> taskConfigs(int maxTasks) { props.put(OVERRIDE_S3_ENDPOINT_CONFIG, configProperties.get(OVERRIDE_S3_ENDPOINT_CONFIG).toString()); props.put(S3_PATHSTYLE_CONFIG, configProperties.get(S3_PATHSTYLE_CONFIG).toString()); props.put(BUFFER_DIRECTORY_PATH_CONFIG, configProperties.get(BUFFER_DIRECTORY_PATH_CONFIG).toString()); + props.put(COMPRESSION_TYPE, configProperties.get(COMPRESSION_TYPE).toString()); configs.add(props); } return configs; diff --git a/src/main/java/com/deviantart/kafka_connect_s3/S3SinkTask.java b/src/main/java/com/deviantart/kafka_connect_s3/S3SinkTask.java index e3010c1..fca67a2 100644 --- a/src/main/java/com/deviantart/kafka_connect_s3/S3SinkTask.java +++ b/src/main/java/com/deviantart/kafka_connect_s3/S3SinkTask.java @@ -39,7 +39,9 @@ public class S3SinkTask extends SinkTask { private String bufferDirectoryPath; - private Map tmpFiles; + private String compressionType; + + private Map tmpFiles; private Long maxBlockSize; @@ -63,6 +65,7 @@ private void readConfig(Map props) { overrideS3Endpoint = (String)config.get(S3SinkConnector.OVERRIDE_S3_ENDPOINT_CONFIG); s3PathStyle = (Boolean)config.get(S3SinkConnector.S3_PATHSTYLE_CONFIG); bufferDirectoryPath = (String)config.get(S3SinkConnector.BUFFER_DIRECTORY_PATH_CONFIG); + compressionType = (String)config.get(S3SinkConnector.COMPRESSION_TYPE); } @Override @@ -102,7 +105,7 @@ public void put(Collection records) throws ConnectException { String topic = record.topic(); int partition = record.kafkaPartition(); TopicPartition tp = new TopicPartition(topic, partition); - BlockGZIPFileWriter buffer = tmpFiles.get(tp); + BlockFileWriter buffer = tmpFiles.get(tp); if (buffer == null) { log.error("Trying to put {} records to partition {} which doesn't exist yet", records.size(), tp); throw new ConnectException("Trying to put records for a topic partition that has not be assigned"); @@ -121,9 +124,9 @@ public void flush(Map offsets) throws Connect // https://twitter.com/mr_paul_banks/status/702493772983177218 // Instead iterate over the writers we do have and get the offsets directly from them. - for (Map.Entry entry : tmpFiles.entrySet()) { + for (Map.Entry entry : tmpFiles.entrySet()) { TopicPartition tp = entry.getKey(); - BlockGZIPFileWriter writer = entry.getValue(); + BlockFileWriter writer = entry.getValue(); if (writer.getNumRecords() == 0) { // Not done anything yet log.info("No new records for partition {}", tp); @@ -145,9 +148,14 @@ public void flush(Map offsets) throws Connect } } - private BlockGZIPFileWriter createNextBlockWriter(TopicPartition tp, long nextOffset) throws ConnectException, IOException { + private BlockFileWriter createNextBlockWriter(TopicPartition tp, long nextOffset) throws ConnectException, IOException { String name = String.format("%s-%05d", tp.topic(), tp.partition()); - return new BlockGZIPFileWriter(name, bufferDirectoryPath, nextOffset, maxBlockSize); + + if (compressionType == "bzip2") { + return new BlockBZIP2FileWriter(name, bufferDirectoryPath, nextOffset, maxBlockSize); + } else { + return new BlockGZIPFileWriter(name, bufferDirectoryPath, nextOffset, maxBlockSize); + } } @Override @@ -159,7 +167,7 @@ public void onPartitionsAssigned(Collection partitions) throws C public void onPartitionsRevoked(Collection partitions) throws ConnectException { for (TopicPartition tp : partitions) { // See if this is a new assignment - BlockGZIPFileWriter writer = this.tmpFiles.remove(tp); + BlockFileWriter writer = this.tmpFiles.remove(tp); if (writer != null) { log.info("Revoked partition {} deleting buffer", tp); try { @@ -194,7 +202,7 @@ private void recoverPartition(TopicPartition tp) throws IOException { log.info("Recovering partition {} from offset {}", tp, offset); - BlockGZIPFileWriter w = createNextBlockWriter(tp, offset); + BlockFileWriter w = createNextBlockWriter(tp, offset); tmpFiles.put(tp, w); this.context.offset(tp, offset); From 74f84171d2e19b3fe0a9c668f99c0bf4033f02b5 Mon Sep 17 00:00:00 2001 From: R Vadai Date: Mon, 3 Apr 2017 17:12:56 +0100 Subject: [PATCH 06/11] Fixed handling bzip2 writer --- src/main/java/com/deviantart/kafka_connect_s3/S3SinkTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/deviantart/kafka_connect_s3/S3SinkTask.java b/src/main/java/com/deviantart/kafka_connect_s3/S3SinkTask.java index fca67a2..74b566d 100644 --- a/src/main/java/com/deviantart/kafka_connect_s3/S3SinkTask.java +++ b/src/main/java/com/deviantart/kafka_connect_s3/S3SinkTask.java @@ -151,7 +151,7 @@ public void flush(Map offsets) throws Connect private BlockFileWriter createNextBlockWriter(TopicPartition tp, long nextOffset) throws ConnectException, IOException { String name = String.format("%s-%05d", tp.topic(), tp.partition()); - if (compressionType == "bzip2") { + if (compressionType.equals("bzip2")) { return new BlockBZIP2FileWriter(name, bufferDirectoryPath, nextOffset, maxBlockSize); } else { return new BlockGZIPFileWriter(name, bufferDirectoryPath, nextOffset, maxBlockSize); From 76420730cfed5eb8dbac00252794a2d1ed468208 Mon Sep 17 00:00:00 2001 From: R Vadai Date: Mon, 3 Apr 2017 17:15:58 +0100 Subject: [PATCH 07/11] Updated README and version --- README.md | 7 +++---- pom.xml | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 516d939..de0a346 100644 --- a/README.md +++ b/README.md @@ -86,11 +86,9 @@ $ cat system-test-00000-000000000000.index.json | jq -M '.' - Depending on your needs you can either limit to just the single block, or if you want to consume all records after that offset, you can consume from the offset right to the end of the file - The range request bytes can be decompressed as a GZIP file on their own with any GZIP compatible tool, provided you limit to whole block boundaries. -## Other Formats +## BZip2 format -For now we only support Block-GZIP output. This assumes that all your kafka messages can be output as newline-delimited text files. - -We could make the output format pluggable if others have use for this connector, but need binary serialisation formats like Avro/Thrift/Protobuf etc. Pull requests welcome. +Works exactly the same way as the Block-GZIP output format, see above. ## Build and Run @@ -113,6 +111,7 @@ In addition to the [standard kafka-connect config options](http://kafka.apache.o | s3.endpoint | AWS defaults per region | Mostly useful for testing. | | s3.path_style | `false` | Force path-style access to bucket rather than subdomain. Mostly useful for tests. | | compressed_block_size | 67108864 | How much _uncompressed_ data to write to the file before we rol to a new block/chunk. See [Block-GZIP](#user-content-block-gzip-output-format) section above. | +| compression.type | `gzip` | The compression algorithm, either `gzip` or `bzip2`. | Note that we use the default AWS SDK credentials provider. [Refer to their docs](http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html#id1) for the options for configuring S3 credentials. diff --git a/pom.xml b/pom.xml index ec5715e..49508fa 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.deviantart kafka-connect-s3 - 0.0.3 + 0.0.4 jar kafka-connect-s3 From 6e8a7a9ad90a0bf82ac81d958f96d52220f8fe02 Mon Sep 17 00:00:00 2001 From: R Vadai Date: Mon, 3 Apr 2017 17:21:36 +0100 Subject: [PATCH 08/11] Added back example-connect-s3-sink.properties --- example-connect-s3-sink.properties | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 example-connect-s3-sink.properties diff --git a/example-connect-s3-sink.properties b/example-connect-s3-sink.properties new file mode 100644 index 0000000..494d3b6 --- /dev/null +++ b/example-connect-s3-sink.properties @@ -0,0 +1,8 @@ +name=s3-sink +connector.class=com.deviantart.kafka_connect_s3.S3SinkConnector +tasks.max=1 +topics=test +s3.bucket=connect-test +s3.prefix=connect-test +local.buffer.dir=/tmp/kafka-connect-s3.buffer +compression.type=gzip From 605dd23c45044a070909559f82db8512b74b39ae Mon Sep 17 00:00:00 2001 From: R Vadai Date: Mon, 3 Apr 2017 17:24:51 +0100 Subject: [PATCH 09/11] Updated README section on testing --- README.md | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index de0a346..b025195 100644 --- a/README.md +++ b/README.md @@ -117,18 +117,5 @@ Note that we use the default AWS SDK credentials provider. [Refer to their docs] ## Testing -Most of the custom logic for handling output formatting, and managing S3 has reasonable mocked unit tests. There are probably improvements that can be made, but the logic is not especially complex. - -There is also a basic system test to validate the integration with kafka-connect. This is not complete nor is it 100% deterministic due to the vagaries of multiple systems with non-deterministic things like timers effecting behaviour. - -But it does consistently pass when run by hand on my Mac and validates basic operation of: - - - Initialisation and consuming/flushing all expected data - - Resuming correctly on restart based on S3 state not relying on local disk state - - Reconfiguring partitions of a topic and correctly resuming each - -It doesn't test distributed mode operation yet, however the above is enough to exercise all of the integration points with the kafka-connect runtime. - -### System Test Setup - -See [the README in the system_test dir](/system_test/README.md) for details on setting up dependencies and environment to run the tests. +Integration tests can be ran using Docker. +Simply run the `run-integration-tests.sh` script on a Mac/Linux system. From 9ba1eb4f29c705bba8cc69303f61b46bdb07df97 Mon Sep 17 00:00:00 2001 From: R Vadai Date: Thu, 4 May 2017 11:28:46 +0100 Subject: [PATCH 10/11] Fixed setting host IP on Linux for integration tests --- run-integration-tests.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/run-integration-tests.sh b/run-integration-tests.sh index 29770bb..d1b599b 100755 --- a/run-integration-tests.sh +++ b/run-integration-tests.sh @@ -5,7 +5,7 @@ set -e trap "docker-compose down" EXIT if uname | grep -q Linux; then - export DOCKER_BIND_IP=$(ip route | awk '/default/ { print $3 }') + export DOCKER_BIND_IP=$(ip addr | grep 'eth0:' -A2 | tail -n1 | awk '{print $2}' | cut -f1 -d'/') elif uname | grep -q Darwin; then export DOCKER_BIND_IP=$(ifconfig en0 | grep inet | grep -v inet6 | awk '{print $2}') else @@ -32,7 +32,7 @@ kafka-topics --zookeeper zookeeper:2181 \ --partitions 3 \ --replication-factor 1 -sleep 5 +sleep 10 # Submitting JSON configuration curl -H "Content-Type: application/json" \ From 118a6d3f02e3e8720f6e864c3b30f70eb9d98d90 Mon Sep 17 00:00:00 2001 From: R Vadai Date: Tue, 9 May 2017 15:59:41 +0100 Subject: [PATCH 11/11] Removed line that says only Gzip output format is available --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index b025195..7bd6b50 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ This was built against Kafka 0.10.1.1. ## Block-GZIP Output Format -For now there is just one output format which is essentially just a GZIPed text file with one Kafka message per line. +This format is essentially just a GZIPed text file with one Kafka message per line. It's actually a little more sophisticated than that though. We exploit a property of GZIP whereby multiple GZIP encoded files can be concatenated to produce a single file. Such a concatenated file is a valid GZIP file in its own right and will be decompressed by _any GZIP implementation_ to a single stream of lines -- exactly as if the input files were concatenated first and compressed together.