Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added BZip2 support #24

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
local-*
# Created by https://www.gitignore.io/api/maven

.idea
*.iml

### Maven ###
target/
pom.xml.tag
Expand Down
26 changes: 6 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ This was built against Kafka 0.10.1.1.

## Block-GZIP Output Format

For now there is just one output format which is essentially just a GZIPed text file with one Kafka message per line.
This format is essentially just a GZIPed text file with one Kafka message per line.

It's actually a little more sophisticated than that though. We exploit a property of GZIP whereby multiple GZIP encoded files can be concatenated to produce a single file. Such a concatenated file is a valid GZIP file in its own right and will be decompressed by _any GZIP implementation_ to a single stream of lines -- exactly as if the input files were concatenated first and compressed together.

Expand Down Expand Up @@ -86,11 +86,9 @@ $ cat system-test-00000-000000000000.index.json | jq -M '.'
- Depending on your needs you can either limit to just the single block, or if you want to consume all records after that offset, you can consume from the offset right to the end of the file
- The range request bytes can be decompressed as a GZIP file on their own with any GZIP compatible tool, provided you limit to whole block boundaries.

## Other Formats
## BZip2 format

For now we only support Block-GZIP output. This assumes that all your kafka messages can be output as newline-delimited text files.

We could make the output format pluggable if others have use for this connector, but need binary serialisation formats like Avro/Thrift/Protobuf etc. Pull requests welcome.
Works exactly the same way as the Block-GZIP output format, see above.

## Build and Run

Expand All @@ -113,23 +111,11 @@ In addition to the [standard kafka-connect config options](http://kafka.apache.o
| s3.endpoint | AWS defaults per region | Mostly useful for testing. |
| s3.path_style | `false` | Force path-style access to bucket rather than subdomain. Mostly useful for tests. |
| compressed_block_size | 67108864 | How much _uncompressed_ data to write to the file before we rol to a new block/chunk. See [Block-GZIP](#user-content-block-gzip-output-format) section above. |
| compression.type | `gzip` | The compression algorithm, either `gzip` or `bzip2`. |

Note that we use the default AWS SDK credentials provider. [Refer to their docs](http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html#id1) for the options for configuring S3 credentials.

## Testing

Most of the custom logic for handling output formatting, and managing S3 has reasonable mocked unit tests. There are probably improvements that can be made, but the logic is not especially complex.

There is also a basic system test to validate the integration with kafka-connect. This is not complete nor is it 100% deterministic due to the vagaries of multiple systems with non-deterministic things like timers effecting behaviour.

But it does consistently pass when run by hand on my Mac and validates basic operation of:

- Initialisation and consuming/flushing all expected data
- Resuming correctly on restart based on S3 state not relying on local disk state
- Reconfiguring partitions of a topic and correctly resuming each

It doesn't test distributed mode operation yet, however the above is enough to exercise all of the integration points with the kafka-connect runtime.

### System Test Setup

See [the README in the system_test dir](/system_test/README.md) for details on setting up dependencies and environment to run the tests.
Integration tests can be ran using Docker.
Simply run the `run-integration-tests.sh` script on a Mac/Linux system.
55 changes: 55 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper
hostname: zookeeper
ports:
- '2181:2181'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-kafka
hostname: broker
depends_on:
- zookeeper
ports:
- '9092:9092'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
- KAFKA_ADVERTISED_LISTENERS

fakes3:
image: lphoward/fake-s3
hostname: fakes3
ports:
- '4569:4569'

connect:
image: confluentinc/cp-kafka-connect
hostname: connect
depends_on:
- zookeeper
- broker
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:9092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONNECT_CONSUMER_MAX_POLL_INTERVAL_MS: 1000
volumes:
- './target:/etc/kafka-connect/jars'
3 changes: 2 additions & 1 deletion example-connect-s3-sink.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ tasks.max=1
topics=test
s3.bucket=connect-test
s3.prefix=connect-test
local.buffer.dir=/tmp/kafka-connect-s3.buffer
local.buffer.dir=/tmp/kafka-connect-s3.buffer
compression.type=gzip
37 changes: 0 additions & 37 deletions example-connect-worker.properties

This file was deleted.

28 changes: 26 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.deviantart</groupId>
<artifactId>kafka-connect-s3</artifactId>
<version>0.0.3</version>
<version>0.0.4</version>
<packaging>jar</packaging>

<name>kafka-connect-s3</name>
Expand Down Expand Up @@ -45,6 +45,19 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.19.1</version>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand All @@ -70,10 +83,15 @@
<artifactId>aws-java-sdk-s3</artifactId>
<version>${s3.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.13</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -82,5 +100,11 @@
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.11.109</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
43 changes: 43 additions & 0 deletions run-integration-tests.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/usr/bin/env bash

set -e

trap "docker-compose down" EXIT

if uname | grep -q Linux; then
export DOCKER_BIND_IP=$(ip addr | grep 'eth0:' -A2 | tail -n1 | awk '{print $2}' | cut -f1 -d'/')
elif uname | grep -q Darwin; then
export DOCKER_BIND_IP=$(ifconfig en0 | grep inet | grep -v inet6 | awk '{print $2}')
else
echo "Unsupported operating system."
exit 1
fi

echo -e "\nDocker bind IP address: $DOCKER_BIND_IP\n"

export KAFKA_HOST=$DOCKER_BIND_IP
export KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://${KAFKA_HOST}:9092"
export KAFKA_BROKERS="${KAFKA_HOST}:9092"

# Building connector JAR and starting test cluster
mvn clean package
docker-compose up -d
sleep 10

# Creating topic for test messages
docker-compose exec broker \
kafka-topics --zookeeper zookeeper:2181 \
--topic test-topic \
--create \
--partitions 3 \
--replication-factor 1

sleep 10

# Submitting JSON configuration
curl -H "Content-Type: application/json" \
--data "@src/test/resources/test-connector.json" \
http://localhost:8083/connectors

# Running integration tests
mvn verify
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.deviantart.kafka_connect_s3;

import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;

import java.util.zip.GZIPOutputStream;

import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;

/**
* BlockBZIP2FileWriter accumulates newline delimited UTF-8 records and writes them to an
* output file that is readable by BZIP2.
*
* In fact this file is the concatenation of possibly many separate BZIP2 files corresponding to smaller chunks
* of the input. Alongside the output filename.gz file, a file filename-index.json is written containing JSON
* metadata about the size and location of each block.
*
* This allows a reading class to skip to particular line/record without decompressing whole file by looking up
* the offset of the containing block, seeking to it and beginning BZIP2 read from there.
*
* This is especially useful when the file is an archive in HTTP storage like Amazon S3 where GET request with
* range headers can allow pulling a small segment from overall compressed file.
*
* Note that thanks to BZIP2 spec, the overall file is perfectly valid and will compress as if it was a single stream
* with any regular BZIP2 decoding library or program.
*/
public class BlockBZIP2FileWriter extends BlockFileWriter {

private BZip2CompressorOutputStream bzip2Stream;

public BlockBZIP2FileWriter(String filenameBase, String path) throws FileNotFoundException, IOException {
this(filenameBase, path, 0, 67108864);
}

public BlockBZIP2FileWriter(String filenameBase, String path, long firstRecordOffset) throws FileNotFoundException, IOException {
this(filenameBase, path, firstRecordOffset, 67108864);
}

public BlockBZIP2FileWriter(String filenameBase, String path, long firstRecordOffset, long chunkThreshold) throws FileNotFoundException, IOException {
super(filenameBase, path, firstRecordOffset, chunkThreshold);
}

@Override
protected void initChunkWriter() throws IOException, UnsupportedEncodingException {
bzip2Stream = new BZip2CompressorOutputStream(fileStream);
writer = new BufferedWriter(new OutputStreamWriter(bzip2Stream, "UTF-8"));
}

@Override
protected void finishChunk() throws IOException {
Chunk ch = currentChunk();

// Complete GZIP block without closing stream
writer.flush();
bzip2Stream.finish();

// We can no find out how long this chunk was compressed
long bytesWritten = fileStream.getNumBytesWritten();
ch.compressedByteLength = bytesWritten - ch.byteOffset;
}

@Override
public String getDataFileName() {
return String.format("%s-%012d.bzip2", filenameBase, super.getFirstRecordOffset());
}
}
Loading