Skip to content

Commit

Permalink
Issue 50 (#51)
Browse files Browse the repository at this point in the history
* Partial support for ELF. #50

* Full support for ELF. Added InputFileDeque the number of calls to list the directory contents. This will query only when needed to reduce the number of stat calls.
  • Loading branch information
jcustenborder authored May 2, 2018
1 parent cac32a7 commit b85ffb6
Show file tree
Hide file tree
Showing 31 changed files with 1,125 additions and 105 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
target
*.iml
.okhttpcache
ELFTesting.properties
31 changes: 15 additions & 16 deletions bin/debug.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -18,25 +18,24 @@
: ${INPUT_PATH:='/tmp/spooldir/input'}
: ${ERROR_PATH:='/tmp/spooldir/error'}
: ${FINISHED_PATH:='/tmp/spooldir/finished'}
: ${DEBUG_SUSPEND_FLAG:='n'}
export KAFKA_DEBUG='y'

: ${DEBUG_SUSPEND_FLAG:='y'}
export KAFKA_DEBUG='n'
export KAFKA_OPTS='-agentpath:/Applications/YourKit-Java-Profiler-2017.02.app/Contents/Resources/bin/mac/libyjpagent.jnilib=disablestacktelemetry,exceptions=disable,delay=10000'
set -e

mvn clean package

if [ ! -d "${INPUT_PATH}" ]; then
mkdir -p "${INPUT_PATH}"
fi

if [ ! -d "${ERROR_PATH}" ]; then
mkdir -p "${ERROR_PATH}"
fi
#if [ ! -d "${INPUT_PATH}" ]; then
# mkdir -p "${INPUT_PATH}"
#fi

if [ ! -d "${FINISHED_PATH}" ]; then
mkdir -p "${FINISHED_PATH}"
fi
#if [ ! -d "${ERROR_PATH}" ]; then
# mkdir -p "${ERROR_PATH}"
#fi

cp src/test/resources/com/github/jcustenborder/kafka/connect/spooldir/csv/FieldsMatch.data "${INPUT_PATH}/FieldsMatch.csv"
#if [ ! -d "${FINISHED_PATH}" ]; then
# mkdir -p "${FINISHED_PATH}"
#fi

connect-standalone config/connect-avro-docker.properties config/CSVExample.properties
#cp src/test/resources/com/github/jcustenborder/kafka/connect/spooldir/csv/FieldsMatch.data "${INPUT_PATH}/FieldsMatch.csv
connect-standalone config/connect-avro-docker.properties config/ELFTesting.properties
2 changes: 1 addition & 1 deletion config/CSVExample.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
26 changes: 26 additions & 0 deletions config/ELFTesting.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# Copyright © 2016 Jeremy Custenborder ([email protected])
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

name=elftesting
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.spooldir.elf.SpoolDirELFSourceConnector
input.file.pattern=^.*\.gz$
finished.path=/Users/jeremy/data/confluent/logs/packages/finished
input.path=/Users/jeremy/data/confluent/logs/packages
error.path=/Users/jeremy/data/confluent/logs/packages/error
halt.on.error=true
topic=cloudfront
schema.generation.enabled=true
10 changes: 5 additions & 5 deletions config/connect-avro-docker.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -14,14 +14,14 @@
# limitations under the License.
#

bootstrap.servers=confluent:9092
bootstrap.servers=kafka:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://confluent:8081
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://confluent:8081
value.converter.schema.registry.url=http://schema-registry:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=target/kafka-connect-target/usr/share/java
plugin.path=target/kafka-connect-target/usr/share/kafka-connect
11 changes: 6 additions & 5 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -17,22 +17,23 @@
version: "2"
services:
zookeeper:
image: confluentinc/cp-zookeeper:3.3.0
image: confluentinc/cp-zookeeper:4.1.0
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:3.3.0
image: confluentinc/cp-kafka:4.1.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: "plaintext://confluent:9092"
KAFKA_ADVERTISED_LISTENERS: "plaintext://kafka:9092"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:3.3.0
image: confluentinc/cp-schema-registry:4.1.0
depends_on:
- kafka
- zookeeper
Expand Down
22 changes: 22 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,21 @@
<?xml version="1.0"?>
<!--
Copyright © 2016 Jeremy Custenborder ([email protected])
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
Expand Down Expand Up @@ -57,6 +74,11 @@
<artifactId>commons-compress</artifactId>
<version>1.16.1</version>
</dependency>
<dependency>
<groupId>com.github.jcustenborder.parsers</groupId>
<artifactId>extended-log-format</artifactId>
<version>[0.0.1.2, 0.0.1.1000)</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Copyright © 2016 Jeremy Custenborder ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.jcustenborder.kafka.connect.spooldir;

import com.google.common.collect.ForwardingDeque;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Deque;
import java.util.List;

public class InputFileDequeue extends ForwardingDeque<File> {
private static final Logger log = LoggerFactory.getLogger(InputFileDequeue.class);
private final SpoolDirSourceConnectorConfig config;

public InputFileDequeue(SpoolDirSourceConnectorConfig config) {
this.config = config;
}

public static File processingFile(String processingFileExtension, File input) {
String fileName = input.getName() + processingFileExtension;
return new File(input.getParentFile(), fileName);
}


Deque<File> files;

@Override
protected Deque<File> delegate() {
if (null != files && !files.isEmpty()) {
return files;
}

log.info("Searching for file in {}", this.config.inputPath);
File[] input = this.config.inputPath.listFiles(this.config.inputFilenameFilter);
if (null == input || input.length == 0) {
log.info("No files matching {} were found in {}", SpoolDirSourceConnectorConfig.INPUT_FILE_PATTERN_CONF, this.config.inputPath);
return new ArrayDeque<>();
}
Arrays.sort(input, Comparator.comparing(File::getName));
List<File> files = new ArrayList<>(input.length);
for (File f : input) {
File processingFile = processingFile(this.config.processingFileExtension, f);
log.trace("Checking for processing file: {}", processingFile);

if (processingFile.exists()) {
log.debug("Skipping {} because processing file exists.", f);
continue;
}
files.add(f);
}

Deque<File> result = new ArrayDeque<>(files.size());

for (File file : files) {
long fileAgeMS = System.currentTimeMillis() - file.lastModified();

if (fileAgeMS < 0L) {
log.warn("File {} has a date in the future.", file);
}

if (this.config.minimumFileAgeMS > 0L && fileAgeMS < this.config.minimumFileAgeMS) {
log.debug("Skipping {} because it does not meet the minimum age.", file);
continue;
}
result.add(file);
}

log.info("Found {} file(s) to process", result.size());
return (this.files = result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,12 @@ class SpoolDirCsvSourceConnectorConfig extends SpoolDirSourceConnectorConfig {

static final String CSV_SKIP_LINES_DOC = "Number of lines to skip in the beginning of the file.";
static final int CSV_SKIP_LINES_DEFAULT = CSVReader.DEFAULT_SKIP_LINES;
static final String CSV_SEPARATOR_CHAR_DOC = "The character that seperates each field. Typically in a CSV this is a , character. A TSV would use \\t.";
static final String CSV_SEPARATOR_CHAR_DOC = "The character that separates each field in the form " +
"of an integer. Typically in a CSV this is a ,(44) character. A TSV would use a tab(9) character.";
static final int CSV_SEPARATOR_CHAR_DEFAULT = (int) CSVParser.DEFAULT_SEPARATOR;
static final int CSV_QUOTE_CHAR_DEFAULT = (int) CSVParser.DEFAULT_QUOTE_CHARACTER;
static final String CSV_ESCAPE_CHAR_DOC = "Escape character.";
static final String CSV_ESCAPE_CHAR_DOC = "The character as an integer to use when a special " +
"character is encountered. The default escape character is typically a \\(92)";
static final int CSV_ESCAPE_CHAR_DEFAULT = (int) CSVParser.DEFAULT_ESCAPE_CHARACTER;
static final String CSV_STRICT_QUOTES_DOC = "Sets the strict quotes setting - if true, characters outside the quotes are ignored.";
static final boolean CSV_STRICT_QUOTES_DEFAULT = CSVParser.DEFAULT_STRICT_QUOTES;
Expand Down Expand Up @@ -288,6 +290,11 @@ public CSVReaderBuilder createCSVReaderBuilder(Reader reader, CSVParser parser)
.withFieldAsNull(nullFieldIndicator);
}

@Override
public boolean schemasRequired() {
return true;
}

static class CharsetValidator implements ConfigDef.Validator {
static CharsetValidator of() {
return new CharsetValidator();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright © 2016 Jeremy Custenborder ([email protected])
* <p>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ public SpoolDirJsonSourceConnectorConfig(final boolean isTask, Map<String, ?> se
super(isTask, config(), settings);
}

@Override
public boolean schemasRequired() {
return true;
}

public static ConfigDef config() {
return SpoolDirSourceConnectorConfig.config();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright © 2016 Jeremy Custenborder ([email protected])
* <p>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void start(final Map<String, String> input) {
this.config = config(input);
final Map<String, String> settings = new LinkedHashMap<>(input);

if (null == this.config.valueSchema || null == this.config.keySchema) {
if (this.config.schemasRequired() && (null == this.config.valueSchema || null == this.config.keySchema)) {
log.info("Key or Value schema was not defined. Running schema generator.");
SchemaGenerator<CONF> generator = generator(settings);

Expand Down
Loading

0 comments on commit b85ffb6

Please sign in to comment.