diff --git a/.gitignore b/.gitignore index b598975..b890f2a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ target *.iml .okhttpcache +ELFTesting.properties diff --git a/bin/debug.sh b/bin/debug.sh index 5bc0770..0cf5dd3 100755 --- a/bin/debug.sh +++ b/bin/debug.sh @@ -6,7 +6,7 @@ # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, @@ -18,25 +18,24 @@ : ${INPUT_PATH:='/tmp/spooldir/input'} : ${ERROR_PATH:='/tmp/spooldir/error'} : ${FINISHED_PATH:='/tmp/spooldir/finished'} -: ${DEBUG_SUSPEND_FLAG:='n'} -export KAFKA_DEBUG='y' - +: ${DEBUG_SUSPEND_FLAG:='y'} +export KAFKA_DEBUG='n' +export KAFKA_OPTS='-agentpath:/Applications/YourKit-Java-Profiler-2017.02.app/Contents/Resources/bin/mac/libyjpagent.jnilib=disablestacktelemetry,exceptions=disable,delay=10000' set -e mvn clean package -if [ ! -d "${INPUT_PATH}" ]; then - mkdir -p "${INPUT_PATH}" -fi - -if [ ! -d "${ERROR_PATH}" ]; then - mkdir -p "${ERROR_PATH}" -fi +#if [ ! -d "${INPUT_PATH}" ]; then +# mkdir -p "${INPUT_PATH}" +#fi -if [ ! -d "${FINISHED_PATH}" ]; then - mkdir -p "${FINISHED_PATH}" -fi +#if [ ! -d "${ERROR_PATH}" ]; then +# mkdir -p "${ERROR_PATH}" +#fi -cp src/test/resources/com/github/jcustenborder/kafka/connect/spooldir/csv/FieldsMatch.data "${INPUT_PATH}/FieldsMatch.csv" +#if [ ! -d "${FINISHED_PATH}" ]; then +# mkdir -p "${FINISHED_PATH}" +#fi -connect-standalone config/connect-avro-docker.properties config/CSVExample.properties \ No newline at end of file +#cp src/test/resources/com/github/jcustenborder/kafka/connect/spooldir/csv/FieldsMatch.data "${INPUT_PATH}/FieldsMatch.csv +connect-standalone config/connect-avro-docker.properties config/ELFTesting.properties \ No newline at end of file diff --git a/config/CSVExample.properties b/config/CSVExample.properties index 3985bff..182729a 100644 --- a/config/CSVExample.properties +++ b/config/CSVExample.properties @@ -5,7 +5,7 @@ # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, diff --git a/config/ELFTesting.properties b/config/ELFTesting.properties new file mode 100644 index 0000000..87cd867 --- /dev/null +++ b/config/ELFTesting.properties @@ -0,0 +1,26 @@ +# +# Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name=elftesting +tasks.max=1 +connector.class=com.github.jcustenborder.kafka.connect.spooldir.elf.SpoolDirELFSourceConnector +input.file.pattern=^.*\.gz$ +finished.path=/Users/jeremy/data/confluent/logs/packages/finished +input.path=/Users/jeremy/data/confluent/logs/packages +error.path=/Users/jeremy/data/confluent/logs/packages/error +halt.on.error=true +topic=cloudfront +schema.generation.enabled=true diff --git a/config/connect-avro-docker.properties b/config/connect-avro-docker.properties index a4ac036..74ffd50 100644 --- a/config/connect-avro-docker.properties +++ b/config/connect-avro-docker.properties @@ -5,7 +5,7 @@ # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, @@ -14,14 +14,14 @@ # limitations under the License. # -bootstrap.servers=confluent:9092 +bootstrap.servers=kafka:9092 key.converter=io.confluent.connect.avro.AvroConverter -key.converter.schema.registry.url=http://confluent:8081 +key.converter.schema.registry.url=http://schema-registry:8081 value.converter=io.confluent.connect.avro.AvroConverter -value.converter.schema.registry.url=http://confluent:8081 +value.converter.schema.registry.url=http://schema-registry:8081 internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets -plugin.path=target/kafka-connect-target/usr/share/java \ No newline at end of file +plugin.path=target/kafka-connect-target/usr/share/kafka-connect \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index f4b68dd..bd34bbb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,7 +5,7 @@ # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, @@ -17,22 +17,23 @@ version: "2" services: zookeeper: - image: confluentinc/cp-zookeeper:3.3.0 + image: confluentinc/cp-zookeeper:4.1.0 ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: - image: confluentinc/cp-kafka:3.3.0 + image: confluentinc/cp-kafka:4.1.0 depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" - KAFKA_ADVERTISED_LISTENERS: "plaintext://confluent:9092" + KAFKA_ADVERTISED_LISTENERS: "plaintext://kafka:9092" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 schema-registry: - image: confluentinc/cp-schema-registry:3.3.0 + image: confluentinc/cp-schema-registry:4.1.0 depends_on: - kafka - zookeeper diff --git a/pom.xml b/pom.xml index 5f5bdb9..b02a8c5 100644 --- a/pom.xml +++ b/pom.xml @@ -1,4 +1,21 @@ + @@ -57,6 +74,11 @@ commons-compress 1.16.1 + + com.github.jcustenborder.parsers + extended-log-format + [0.0.1.2, 0.0.1.1000) + diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/InputFileDequeue.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/InputFileDequeue.java new file mode 100644 index 0000000..6975da8 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/InputFileDequeue.java @@ -0,0 +1,90 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.spooldir; + +import com.google.common.collect.ForwardingDeque; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Deque; +import java.util.List; + +public class InputFileDequeue extends ForwardingDeque { + private static final Logger log = LoggerFactory.getLogger(InputFileDequeue.class); + private final SpoolDirSourceConnectorConfig config; + + public InputFileDequeue(SpoolDirSourceConnectorConfig config) { + this.config = config; + } + + public static File processingFile(String processingFileExtension, File input) { + String fileName = input.getName() + processingFileExtension; + return new File(input.getParentFile(), fileName); + } + + + Deque files; + + @Override + protected Deque delegate() { + if (null != files && !files.isEmpty()) { + return files; + } + + log.info("Searching for file in {}", this.config.inputPath); + File[] input = this.config.inputPath.listFiles(this.config.inputFilenameFilter); + if (null == input || input.length == 0) { + log.info("No files matching {} were found in {}", SpoolDirSourceConnectorConfig.INPUT_FILE_PATTERN_CONF, this.config.inputPath); + return new ArrayDeque<>(); + } + Arrays.sort(input, Comparator.comparing(File::getName)); + List files = new ArrayList<>(input.length); + for (File f : input) { + File processingFile = processingFile(this.config.processingFileExtension, f); + log.trace("Checking for processing file: {}", processingFile); + + if (processingFile.exists()) { + log.debug("Skipping {} because processing file exists.", f); + continue; + } + files.add(f); + } + + Deque result = new ArrayDeque<>(files.size()); + + for (File file : files) { + long fileAgeMS = System.currentTimeMillis() - file.lastModified(); + + if (fileAgeMS < 0L) { + log.warn("File {} has a date in the future.", file); + } + + if (this.config.minimumFileAgeMS > 0L && fileAgeMS < this.config.minimumFileAgeMS) { + log.debug("Skipping {} because it does not meet the minimum age.", file); + continue; + } + result.add(file); + } + + log.info("Found {} file(s) to process", result.size()); + return (this.files = result); + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnectorConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnectorConfig.java index 106cc54..71fc622 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnectorConfig.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnectorConfig.java @@ -64,10 +64,12 @@ class SpoolDirCsvSourceConnectorConfig extends SpoolDirSourceConnectorConfig { static final String CSV_SKIP_LINES_DOC = "Number of lines to skip in the beginning of the file."; static final int CSV_SKIP_LINES_DEFAULT = CSVReader.DEFAULT_SKIP_LINES; - static final String CSV_SEPARATOR_CHAR_DOC = "The character that seperates each field. Typically in a CSV this is a , character. A TSV would use \\t."; + static final String CSV_SEPARATOR_CHAR_DOC = "The character that separates each field in the form " + + "of an integer. Typically in a CSV this is a ,(44) character. A TSV would use a tab(9) character."; static final int CSV_SEPARATOR_CHAR_DEFAULT = (int) CSVParser.DEFAULT_SEPARATOR; static final int CSV_QUOTE_CHAR_DEFAULT = (int) CSVParser.DEFAULT_QUOTE_CHARACTER; - static final String CSV_ESCAPE_CHAR_DOC = "Escape character."; + static final String CSV_ESCAPE_CHAR_DOC = "The character as an integer to use when a special " + + "character is encountered. The default escape character is typically a \\(92)"; static final int CSV_ESCAPE_CHAR_DEFAULT = (int) CSVParser.DEFAULT_ESCAPE_CHARACTER; static final String CSV_STRICT_QUOTES_DOC = "Sets the strict quotes setting - if true, characters outside the quotes are ignored."; static final boolean CSV_STRICT_QUOTES_DEFAULT = CSVParser.DEFAULT_STRICT_QUOTES; @@ -288,6 +290,11 @@ public CSVReaderBuilder createCSVReaderBuilder(Reader reader, CSVParser parser) .withFieldAsNull(nullFieldIndicator); } + @Override + public boolean schemasRequired() { + return true; + } + static class CharsetValidator implements ConfigDef.Validator { static CharsetValidator of() { return new CharsetValidator(); diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceTask.java index 83fd0c8..768a1e7 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceTask.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceTask.java @@ -1,12 +1,12 @@ /** * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

+ * * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceConnectorConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceConnectorConfig.java index d81ddd7..9d3b909 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceConnectorConfig.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceConnectorConfig.java @@ -24,6 +24,11 @@ public SpoolDirJsonSourceConnectorConfig(final boolean isTask, Map se super(isTask, config(), settings); } + @Override + public boolean schemasRequired() { + return true; + } + public static ConfigDef config() { return SpoolDirSourceConnectorConfig.config(); } diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceTask.java index 0395d11..70e9bb5 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceTask.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceTask.java @@ -1,12 +1,12 @@ /** * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

+ * * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceConnector.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceConnector.java index 4567ff9..99580fb 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceConnector.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceConnector.java @@ -56,7 +56,7 @@ public void start(final Map input) { this.config = config(input); final Map settings = new LinkedHashMap<>(input); - if (null == this.config.valueSchema || null == this.config.keySchema) { + if (this.config.schemasRequired() && (null == this.config.valueSchema || null == this.config.keySchema)) { log.info("Key or Value schema was not defined. Running schema generator."); SchemaGenerator generator = generator(settings); diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceConnectorConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceConnectorConfig.java index 77bf7a8..aaefef2 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceConnectorConfig.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceConnectorConfig.java @@ -1,12 +1,12 @@ /** * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

+ * * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -49,7 +49,7 @@ @SuppressWarnings("WeakerAccess") -abstract class SpoolDirSourceConnectorConfig extends AbstractConfig { +public abstract class SpoolDirSourceConnectorConfig extends AbstractConfig { public static final String TIMESTAMP_FIELD_CONF = "timestamp.field"; public static final String TIMESTAMP_MODE_CONF = "timestamp.mode"; //DirectoryMonitorConfig @@ -174,13 +174,13 @@ public SpoolDirSourceConnectorConfig(final boolean isTask, ConfigDef configDef, if (!this.schemaGenerationEnabled) { Preconditions.checkNotNull( this.keySchema, - "'%s' must be set if '%s' = true.", + "'%s' must be set if '%s' = false.", KEY_SCHEMA_CONF, SCHEMA_GENERATION_ENABLED_CONF ); Preconditions.checkNotNull( this.valueSchema, - "'%s' must be set if '%s' = true.", + "'%s' must be set if '%s' = false.", VALUE_SCHEMA_CONF, SCHEMA_GENERATION_ENABLED_CONF ); @@ -260,7 +260,7 @@ public SpoolDirSourceConnectorConfig(final boolean isTask, ConfigDef configDef, this.timestampField = null; } - if (isTask && null == this.valueSchema) { + if (schemasRequired() && (isTask && null == this.valueSchema)) { throw new DataException( String.format("'%s' must be set to a valid schema.", VALUE_SCHEMA_CONF) ); @@ -271,6 +271,9 @@ public SpoolDirSourceConnectorConfig(final boolean isTask, ConfigDef configDef, this.inputFilenameFilter = new PatternFilenameFilter(inputPattern); } + public abstract boolean schemasRequired(); + + private static final Field findMetadataField(Schema schema) { Field result = null; for (Field field : schema.fields()) { @@ -343,7 +346,7 @@ public boolean visible(String key, Map settings) { ConfigKeyBuilder.of(EMPTY_POLL_WAIT_MS_CONF, Type.LONG) .documentation(EMPTY_POLL_WAIT_MS_DOC) .importance(ConfigDef.Importance.LOW) - .defaultValue(1000L) + .defaultValue(250L) .validator(ConfigDef.Range.between(1L, Long.MAX_VALUE)) .group(GROUP_GENERAL) .build() diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceTask.java index 141431c..5ac90ae 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceTask.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceTask.java @@ -1,12 +1,12 @@ /** * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

+ * * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -51,8 +51,8 @@ public abstract class SpoolDirSourceTask sourcePartition; - CONF config; - Stopwatch processingTime = Stopwatch.createStarted(); + protected CONF config; + private Stopwatch processingTime = Stopwatch.createStarted(); private File inputFile; private long inputFileModifiedTime; private InputStream inputStream; @@ -125,6 +125,8 @@ public String version() { return VersionUtil.version(this.getClass()); } + InputFileDequeue inputFileDequeue; + @Override public void start(Map settings) { this.config = config(settings); @@ -143,6 +145,8 @@ Time.SCHEMA, new TimeTypeParser(this.config.parserTimestampTimezone, this.config for (Map.Entry kvp : dateTypeParsers.entrySet()) { this.parser.registerTypeParser(kvp.getKey(), kvp.getValue()); } + + this.inputFileDequeue = new InputFileDequeue(this.config); } @Override @@ -150,16 +154,22 @@ public void stop() { } + int emptyCount = 0; + @Override public List poll() throws InterruptedException { log.trace("poll()"); List results = read(); if (results.isEmpty()) { - log.trace("read() returned empty list. Sleeping {} ms.", this.config.emptyPollWaitMs); - Thread.sleep(this.config.emptyPollWaitMs); + emptyCount++; + if (emptyCount > 1) { + log.trace("read() returned empty list. Sleeping {} ms.", this.config.emptyPollWaitMs); + Thread.sleep(this.config.emptyPollWaitMs); + } + return results; } - + emptyCount = 0; log.trace("read() returning {} result(s)", results.size()); return results; @@ -182,7 +192,7 @@ private void closeAndMoveToFinished(File outputDirectory, boolean errored) throw Files.move(this.inputFile, finishedFile); - File processingFile = processingFile(this.inputFile); + File processingFile = InputFileDequeue.processingFile(this.config.processingFileExtension, this.inputFile); if (processingFile.exists()) { log.info("Removing processing file {}", processingFile); processingFile.delete(); @@ -191,48 +201,44 @@ private void closeAndMoveToFinished(File outputDirectory, boolean errored) throw } } - File processingFile(File input) { - String fileName = input.getName() + this.config.processingFileExtension; - return new File(input.getParentFile(), fileName); - } - - File findNextInputFile() { - File[] input = this.config.inputPath.listFiles(this.config.inputFilenameFilter); - if (null == input || input.length == 0) { - log.debug("No files matching {} were found in {}", SpoolDirSourceConnectorConfig.INPUT_FILE_PATTERN_CONF, this.config.inputPath); - return null; - } - List files = new ArrayList<>(input.length); - for (File f : input) { - File processingFile = processingFile(f); - log.trace("Checking for processing file: {}", processingFile); - - if (processingFile.exists()) { - log.debug("Skipping {} because processing file exists.", f); - continue; - } - files.add(f); - } - - File result = null; - - for (File file : files) { - long fileAgeMS = System.currentTimeMillis() - file.lastModified(); - - if (fileAgeMS < 0L) { - log.warn("File {} has a date in the future.", file); - } - if (this.config.minimumFileAgeMS > 0L && fileAgeMS < this.config.minimumFileAgeMS) { - log.debug("Skipping {} because it does not meet the minimum age.", file); - continue; - } - result = file; - break; - } - - return result; - } +// File findNextInputFile() { +// File[] input = this.config.inputPath.listFiles(this.config.inputFilenameFilter); +// if (null == input || input.length == 0) { +// log.debug("No files matching {} were found in {}", SpoolDirSourceConnectorConfig.INPUT_FILE_PATTERN_CONF, this.config.inputPath); +// return null; +// } +// List files = new ArrayList<>(input.length); +// for (File f : input) { +// File processingFile = InputFileDequeue.processingFile(this.config, f); +// log.trace("Checking for processing file: {}", processingFile); +// +// if (processingFile.exists()) { +// log.debug("Skipping {} because processing file exists.", f); +// continue; +// } +// files.add(f); +// } +// +// File result = null; +// +// for (File file : files) { +// long fileAgeMS = System.currentTimeMillis() - file.lastModified(); +// +// if (fileAgeMS < 0L) { +// log.warn("File {} has a date in the future.", file); +// } +// +// if (this.config.minimumFileAgeMS > 0L && fileAgeMS < this.config.minimumFileAgeMS) { +// log.debug("Skipping {} because it does not meet the minimum age.", file); +// continue; +// } +// result = file; +// break; +// } +// +// return result; +// } static final Map SUPPORTED_COMPRESSION_TYPES = ImmutableMap.of( "bz2", CompressorStreamFactory.BZIP2, @@ -247,7 +253,7 @@ public List read() { if (!hasRecords) { closeAndMoveToFinished(this.config.finishedPath, false); - File nextFile = findNextInputFile(); + File nextFile = this.inputFileDequeue.poll(); if (null == nextFile) { return new ArrayList<>(); } @@ -255,7 +261,7 @@ public List read() { this.metadata = ImmutableMap.of(); this.inputFile = nextFile; this.inputFileModifiedTime = this.inputFile.lastModified(); - File processingFile = processingFile(this.inputFile); + File processingFile = InputFileDequeue.processingFile(this.config.processingFileExtension, this.inputFile); Files.touch(processingFile); try { @@ -351,9 +357,9 @@ protected void addRecord(List records, Struct keyStruct, Struct va sourceOffset, this.config.topic, null, - this.config.keySchema, + null != keyStruct ? keyStruct.schema() : null, keyStruct, - this.config.valueSchema, + valueStruct.schema(), valueStruct, timestamp ); diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SchemaConversion.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SchemaConversion.java new file mode 100644 index 0000000..43a706c --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SchemaConversion.java @@ -0,0 +1,64 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.spooldir.elf; + +import com.github.jcustenborder.kafka.connect.spooldir.elf.converters.LogFieldConverter; +import com.github.jcustenborder.parsers.elf.LogEntry; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class SchemaConversion { + private static final Logger log = LoggerFactory.getLogger(SchemaConversion.class); + private final Schema keySchema; + private final Schema valueSchema; + private final List keyConverters; + private final List valueConverters; + + SchemaConversion(Schema keySchema, Schema valueSchema, List keyConverters, List valueConverters) { + this.keySchema = keySchema; + this.valueSchema = valueSchema; + this.keyConverters = keyConverters; + this.valueConverters = valueConverters; + } + + + public Pair convert(LogEntry entry) { + final Struct key = null != this.keySchema ? new Struct(this.keySchema) : null; + final Struct value = new Struct(this.valueSchema); + + if (null != key) { + for (LogFieldConverter converter : this.keyConverters) { + converter.convert(entry, key); + } + } + + for (LogFieldConverter converter : this.valueConverters) { + converter.convert(entry, value); + } + + if (null != key) { + key.validate(); + } + value.validate(); + return new ImmutablePair<>(key, value); + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SchemaConversionBuilder.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SchemaConversionBuilder.java new file mode 100644 index 0000000..b41b5b3 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SchemaConversionBuilder.java @@ -0,0 +1,91 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.spooldir.elf; + +import com.github.jcustenborder.kafka.connect.spooldir.elf.converters.LogFieldConverter; +import com.github.jcustenborder.kafka.connect.spooldir.elf.converters.LogFieldConverterFactory; +import com.github.jcustenborder.parsers.elf.ElfParser; +import com.google.common.base.Preconditions; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class SchemaConversionBuilder { + private static final Logger log = LoggerFactory.getLogger(SchemaConversionBuilder.class); + final ElfParser parser; + final SpoolDirELFSourceConnectorConfig config; + + public SchemaConversionBuilder(ElfParser parser, SpoolDirELFSourceConnectorConfig config) { + this.parser = parser; + this.config = config; + } + + static String normalizeFieldName(String fieldName) { + Preconditions.checkNotNull(fieldName, "fieldname cannot be null."); + final String result = fieldName.replace('(', '_') + .replace(")", "") + .replace('-', '_') + .toLowerCase(); + return result; + } + + + public SchemaConversion build() { + log.trace("build() - Building SchemaConversion"); + + final SchemaBuilder valueBuilder = SchemaBuilder.struct(); + valueBuilder.name("com.github.jcustenborder.kafka.connect.spooldir.LogEntry"); + + LogFieldConverterFactory factory = new LogFieldConverterFactory(); + List valueConverters = new ArrayList<>(); + + for (Map.Entry> entry : this.parser.fieldTypes().entrySet()) { + final String logFieldName = entry.getKey(); + final Class logFieldClass = entry.getValue(); + final String connectFieldName = normalizeFieldName(logFieldName); + log.trace("build() - Mapping log field '{}' to schema field '{}'", logFieldName, connectFieldName); + final LogFieldConverter converter = factory.create( + valueBuilder, + logFieldClass, + logFieldName, + connectFieldName + ); + valueConverters.add(converter); + } + + if (LocalDate.class.equals(this.parser.fieldTypes().get("date")) && LocalTime.class.equals(this.parser.fieldTypes().get("time"))) { + log.trace("build() - found date and time field. Creating datetime field."); + final LogFieldConverter converter = factory.createDateTime( + valueBuilder, + "date", + "time", + "datetime" + ); + valueConverters.add(converter); + } + + final Schema valueSchema = valueBuilder.build(); + + return new SchemaConversion(null, valueSchema, null, valueConverters); + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SpoolDirELFSourceConnector.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SpoolDirELFSourceConnector.java new file mode 100644 index 0000000..61f54d8 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SpoolDirELFSourceConnector.java @@ -0,0 +1,51 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.spooldir.elf; + +import com.github.jcustenborder.kafka.connect.spooldir.SchemaGenerator; +import com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceConnector; +import com.github.jcustenborder.kafka.connect.utils.config.Description; +import com.github.jcustenborder.kafka.connect.utils.config.Title; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; + +import java.util.Map; + +@Title("Extended Log File Format Source Connector") +@Description("This connector is used to stream `Extended Log File Format ` " + + "files from a directory while converting the data to a strongly typed schema.") +public class SpoolDirELFSourceConnector extends SpoolDirSourceConnector { + + @Override + protected SpoolDirELFSourceConnectorConfig config(Map settings) { + return new SpoolDirELFSourceConnectorConfig(false, settings); + } + + @Override + protected SchemaGenerator generator(Map settings) { + return null; + } + + @Override + public Class taskClass() { + return SpoolDirELFSourceTask.class; + } + + @Override + public ConfigDef config() { + return SpoolDirELFSourceConnectorConfig.config(); + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SpoolDirELFSourceConnectorConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SpoolDirELFSourceConnectorConfig.java new file mode 100644 index 0000000..3058689 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SpoolDirELFSourceConnectorConfig.java @@ -0,0 +1,47 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.spooldir.elf; + +import com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceConnectorConfig; +import org.apache.kafka.common.config.ConfigDef; + +import java.util.Map; + +class SpoolDirELFSourceConnectorConfig extends SpoolDirSourceConnectorConfig { + public SpoolDirELFSourceConnectorConfig(final boolean isTask, Map settings) { + super(isTask, config(), settings); + } + + public static final String KEY_FIELDS_CONFIG = "key.fields"; + public static final String KEY_FIELDS_DOC = "key.fields"; + + + @Override + public boolean schemasRequired() { + return false; + } + + public static ConfigDef config() { + return SpoolDirSourceConnectorConfig.config(); +// .define( +// ConfigKeyBuilder.of(KEY_FIELDS_CONFIG, ConfigDef.Type.LIST) +// .documentation(KEY_FIELDS_DOC) +// .importance(ConfigDef.Importance.HIGH) +// .build() +// ); + } + +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SpoolDirELFSourceTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SpoolDirELFSourceTask.java new file mode 100644 index 0000000..fd1da4f --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SpoolDirELFSourceTask.java @@ -0,0 +1,103 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.spooldir.elf; + +import com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceTask; +import com.github.jcustenborder.parsers.elf.ElfParser; +import com.github.jcustenborder.parsers.elf.ElfParserBuilder; +import com.github.jcustenborder.parsers.elf.LogEntry; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class SpoolDirELFSourceTask extends SpoolDirSourceTask { + private static final Logger log = LoggerFactory.getLogger(SpoolDirELFSourceTask.class); + ElfParser parser; + ElfParserBuilder parserBuilder; + SchemaConversion conversion; + long offset; + + @Override + protected SpoolDirELFSourceConnectorConfig config(Map settings) { + return new SpoolDirELFSourceConnectorConfig(true, settings); + } + + @Override + public void start(Map settings) { + super.start(settings); + this.parserBuilder = ElfParserBuilder.of(); + } + + + @Override + protected void configure(InputStream inputStream, Map metadata, Long lastOffset) throws IOException { + if (null != this.parser) { + log.trace("configure() - Closing existing parser."); + this.parser.close(); + } + + this.parser = this.parserBuilder.build(inputStream); + SchemaConversionBuilder builder = new SchemaConversionBuilder(this.parser, this.config); + this.conversion = builder.build(); + + this.offset = -1; + + if (null != lastOffset) { + int skippedRecords = 1; + while (null != next() && skippedRecords <= lastOffset) { + skippedRecords++; + } + log.trace("configure() - Skipped {} record(s).", skippedRecords); + log.info("configure() - Starting on offset {}", this.offset); + } + } + + LogEntry next() throws IOException { + this.offset++; + return this.parser.next(); + } + + @Override + protected List process() { + List records = new ArrayList<>(this.config.batchSize); + + LogEntry entry; + try { + while (null != (entry = this.parser.next()) && records.size() < this.config.batchSize) { + Pair converted = conversion.convert(entry); + addRecord(records, converted.getKey(), converted.getValue()); + } + } catch (IOException ex) { + throw new ConnectException(ex); + } + + return records; + } + + @Override + protected long recordOffset() { + return this.offset; + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/converters/LocalDateLogFieldConverter.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/converters/LocalDateLogFieldConverter.java new file mode 100644 index 0000000..275d2ca --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/converters/LocalDateLogFieldConverter.java @@ -0,0 +1,38 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.spooldir.elf.converters; + +import org.apache.kafka.connect.data.Field; + +import java.sql.Date; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneId; + +public class LocalDateLogFieldConverter extends LogFieldConverter { + private static final ZoneId ZONE_ID = ZoneId.of("UTC"); + + @Override + protected Object convert(Object input) { + final LocalDate localDate = (LocalDate) input; + final Instant instant = localDate.atStartOfDay(ZONE_ID).toInstant(); + return Date.from(instant); + } + + public LocalDateLogFieldConverter(String logFieldName, Field field) { + super(logFieldName, field); + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/converters/LocalTimeLogFieldConverter.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/converters/LocalTimeLogFieldConverter.java new file mode 100644 index 0000000..691df82 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/converters/LocalTimeLogFieldConverter.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.spooldir.elf.converters; + +import org.apache.kafka.connect.data.Field; + +import java.sql.Date; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.ZoneOffset; + +public class LocalTimeLogFieldConverter extends LogFieldConverter { + private static final LocalDate EPOCH_DATE = LocalDate.ofEpochDay(0); + + @Override + protected Object convert(Object input) { + final LocalTime localTime = (LocalTime) input; + final Instant instant = localTime.atDate(EPOCH_DATE).toInstant(ZoneOffset.UTC); + return Date.from(instant); + } + + public LocalTimeLogFieldConverter(String logFieldName, Field field) { + super(logFieldName, field); + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/converters/LogFieldConverter.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/converters/LogFieldConverter.java new file mode 100644 index 0000000..bb28253 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/converters/LogFieldConverter.java @@ -0,0 +1,49 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.spooldir.elf.converters; + +import com.github.jcustenborder.parsers.elf.LogEntry; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class LogFieldConverter { + private static final Logger log = LoggerFactory.getLogger(LogFieldConverter.class); + protected final String logFieldName; + protected final Field field; + + protected abstract Object convert(Object input); + + public LogFieldConverter(String logFieldName, Field field) { + this.logFieldName = logFieldName; + this.field = field; + } + + public void convert(LogEntry logEntry, Struct struct) { + final Object input = logEntry.fieldData().get(this.logFieldName); + final Object output; + if (null == input) { + output = null; + } else { + output = convert(input); + } + + log.trace("convert() - Setting {} to {}", field.name(), output); + struct.put(this.field, output); + } + +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/converters/LogFieldConverterFactory.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/converters/LogFieldConverterFactory.java new file mode 100644 index 0000000..af5bf19 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/converters/LogFieldConverterFactory.java @@ -0,0 +1,136 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.spooldir.elf.converters; + +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; + +import java.time.LocalDate; +import java.time.LocalTime; + +public class LogFieldConverterFactory { + + static Schema schema(Class logClass, String logFieldName) { + final SchemaBuilder builder; + if (LocalDate.class.equals(logClass)) { + builder = Date.builder(); + } else if (LocalTime.class.equals(logClass)) { + builder = Time.builder(); + } else if (Integer.class.equals(logClass)) { + builder = SchemaBuilder.int32(); + } else if (Long.class.equals(logClass)) { + builder = SchemaBuilder.int64(); + } else if (String.class.equals(logClass)) { + builder = SchemaBuilder.string(); + } else { + throw new UnsupportedOperationException( + String.format("%s is not a supported type.", logClass.getName()) + ); + } + builder.optional(); + + + return builder.build(); + } + + private static final String LOGFIELD_PARAM = "logField"; + + public LogFieldConverter create( + SchemaBuilder builder, + Class logClass, + String logFieldName, + String schemaFieldName) { + + final Schema fieldSchema; + final Field field; + final LogFieldConverter converter; + + if (LocalDate.class.equals(logClass)) { + fieldSchema = Date.builder() + .optional() + .parameter(LOGFIELD_PARAM, logFieldName) + .build(); + builder.field(schemaFieldName, fieldSchema); + field = builder.field(schemaFieldName); + converter = new LocalDateLogFieldConverter(logFieldName, field); + } else if (LocalTime.class.equals(logClass)) { + fieldSchema = Time.builder() + .optional() + .parameter(LOGFIELD_PARAM, logFieldName) + .build(); + builder.field(schemaFieldName, fieldSchema); + field = builder.field(schemaFieldName); + converter = new LocalTimeLogFieldConverter(logFieldName, field); + } else if (Integer.class.equals(logClass)) { + fieldSchema = SchemaBuilder.int32() + .optional() + .parameter(LOGFIELD_PARAM, logFieldName) + .build(); + builder.field(schemaFieldName, fieldSchema); + field = builder.field(schemaFieldName); + converter = new PrimitiveLogFieldConverter(logFieldName, field); + } else if (Long.class.equals(logClass)) { + fieldSchema = SchemaBuilder.int64() + .optional() + .parameter(LOGFIELD_PARAM, logFieldName) + .build(); + builder.field(schemaFieldName, fieldSchema); + field = builder.field(schemaFieldName); + converter = new PrimitiveLogFieldConverter(logFieldName, field); + } else if (String.class.equals(logClass)) { + fieldSchema = SchemaBuilder.string() + .optional() + .parameter(LOGFIELD_PARAM, logFieldName) + .build(); + builder.field(schemaFieldName, fieldSchema); + field = builder.field(schemaFieldName); + converter = new PrimitiveLogFieldConverter(logFieldName, field); + } else if (Double.class.equals(logClass)) { + fieldSchema = SchemaBuilder.float64() + .optional() + .parameter(LOGFIELD_PARAM, logFieldName) + .build(); + builder.field(schemaFieldName, fieldSchema); + field = builder.field(schemaFieldName); + converter = new PrimitiveLogFieldConverter(logFieldName, field); + } else { + throw new UnsupportedOperationException( + String.format("%s is not a supported type.", logClass.getName()) + ); + } + + return converter; + } + + public LogFieldConverter createDateTime(SchemaBuilder builder, String logEntryDateField, String logEntryTimeField, String connectTimestampField) { + final Schema fieldSchema = Timestamp.builder() + .optional() + .parameter(LOGFIELD_PARAM, String.format("%s,%s", logEntryDateField, logEntryTimeField)) + .build(); + builder.field(connectTimestampField, fieldSchema); + final Field field = builder.field(connectTimestampField); + final LogFieldConverter converter = new TimestampLogFieldConverter( + field, + logEntryTimeField, + logEntryDateField + ); + return converter; + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/converters/PrimitiveLogFieldConverter.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/converters/PrimitiveLogFieldConverter.java new file mode 100644 index 0000000..c9eda50 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/converters/PrimitiveLogFieldConverter.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.spooldir.elf.converters; + +import org.apache.kafka.connect.data.Field; + +public class PrimitiveLogFieldConverter extends LogFieldConverter { + @Override + protected Object convert(Object input) { + return input; + } + + public PrimitiveLogFieldConverter(String logFieldName, Field field) { + super(logFieldName, field); + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/converters/TimestampLogFieldConverter.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/converters/TimestampLogFieldConverter.java new file mode 100644 index 0000000..698a599 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/elf/converters/TimestampLogFieldConverter.java @@ -0,0 +1,58 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.spooldir.elf.converters; + +import com.github.jcustenborder.parsers.elf.LogEntry; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Struct; + +import java.sql.Date; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.ZoneOffset; + +public class TimestampLogFieldConverter extends LogFieldConverter { + private final String timeField; + private final String dateField; + + public TimestampLogFieldConverter(Field field, String timeField, String dateField) { + super(null, field); + this.timeField = timeField; + this.dateField = dateField; + } + + @Override + protected Object convert(Object input) { + return null; + } + + @Override + public void convert(LogEntry logEntry, Struct struct) { + final LocalDate date = (LocalDate) logEntry.fieldData().get(this.dateField); + final LocalTime time = (LocalTime) logEntry.fieldData().get(this.timeField); + + final Object value; + + if (null == date || null == time) { + value = null; + } else { + final Instant instant = time.atDate(date).toInstant(ZoneOffset.UTC); + value = Date.from(instant); + } + struct.put(this.field, value); + } +} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SchemaGeneratorTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SchemaGeneratorTest.java index 6813929..6fda746 100644 --- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SchemaGeneratorTest.java +++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SchemaGeneratorTest.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.github.jcustenborder.kafka.connect.spooldir; import com.google.common.io.Files; diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceTaskTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceTaskTest.java index e774461..c99428b 100644 --- a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceTaskTest.java +++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceTaskTest.java @@ -1,12 +1,12 @@ /** * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) - *

+ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - *

+ * * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -57,7 +57,7 @@ public abstract class SpoolDirSourceTaskTest { protected T task; @BeforeEach - public void setup() throws JsonProcessingException { + public void setup() { this.tempDirectory = Files.createTempDir(); this.finishedPath = new File(this.tempDirectory, "finished"); this.inputPath = new File(this.tempDirectory, "input"); @@ -115,7 +115,7 @@ protected void poll(final String packageName, TestCase testCase) throws Interrup final File inputFile = new File(this.inputPath, inputFileName); log.trace("poll(String, TestCase) - inputFile = {}", inputFile); - final File processingFile = this.task.processingFile(inputFile); + final File processingFile = InputFileDequeue.processingFile(SpoolDirSourceConnectorConfig.PROCESSING_FILE_EXTENSION_DEFAULT, inputFile); try (InputStream inputStream = this.getClass().getResourceAsStream(dataFile)) { try (OutputStream outputStream = new FileOutputStream(inputFile)) { ByteStreams.copy(inputStream, outputStream); diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SchemaConversionBuilderTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SchemaConversionBuilderTest.java new file mode 100644 index 0000000..5ee17df --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SchemaConversionBuilderTest.java @@ -0,0 +1,109 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.spooldir.elf; + +import com.github.jcustenborder.parsers.elf.ElfParser; +import com.github.jcustenborder.parsers.elf.LogEntry; +import com.google.common.collect.ImmutableMap; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.connect.data.Struct; +import org.junit.jupiter.api.DynamicTest; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestFactory; + +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.DynamicTest.dynamicTest; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SchemaConversionBuilderTest { + + @TestFactory + public Stream normalizeFieldName() { + Map tests = new LinkedHashMap<>(); + tests.put("date", "date"); + tests.put("time", "time"); + tests.put("x-edge-location", "x_edge_location"); + tests.put("sc-bytes", "sc_bytes"); + tests.put("c-ip", "c_ip"); + tests.put("cs-method", "cs_method"); + tests.put("cs(Host)", "cs_host"); + tests.put("cs-uri-stem", "cs_uri_stem"); + tests.put("sc-status", "sc_status"); + tests.put("cs(Referer)", "cs_referer"); + tests.put("cs(User-Agent)", "cs_user_agent"); + tests.put("cs-uri-query", "cs_uri_query"); + tests.put("cs(Cookie)", "cs_cookie"); + tests.put("x-edge-result-type", "x_edge_result_type"); + tests.put("x-edge-request-id", "x_edge_request_id"); + tests.put("x-host-header", "x_host_header"); + tests.put("cs-protocol", "cs_protocol"); + tests.put("cs-bytes", "cs_bytes"); + tests.put("time-taken", "time_taken"); + + return tests.entrySet().stream().map(e -> dynamicTest(e.getKey(), () -> { + final String actual = SchemaConversionBuilder.normalizeFieldName(e.getKey()); + assertEquals(e.getValue(), actual, "field name does not match."); + })); + } + + + @Test + public void foo() { + ElfParser parser = mock(ElfParser.class); + final Map> fieldTypes = ImmutableMap.of( + "date", LocalDate.class, + "time", LocalTime.class, + "sc-bytes", Long.class, + "sc-status", Integer.class + ); + final Map fieldData = ImmutableMap.of( + "date", LocalDate.of(2011, 3, 14), + "time", LocalTime.of(12, 0, 0), + "sc-bytes", 12341L, + "sc-status", 200 + ); + when(parser.fieldTypes()).thenReturn(fieldTypes); + + SchemaConversionBuilder schemaGenerator = new SchemaConversionBuilder(parser, null); + SchemaConversion conversion = schemaGenerator.build(); + assertNotNull(conversion, "conversion should not be null."); + + LogEntry entry = mock(LogEntry.class); + when(entry.fieldTypes()).thenReturn(fieldTypes); + when(entry.fieldData()).thenReturn(fieldData); + + Pair actual = conversion.convert(entry); + assertNotNull(actual, "actual should not be null"); +// assertNotNull(actual.getKey(), "actual.getKey() should not be null"); + assertNotNull(actual.getValue(), "actual.getValue() should not be null"); + + actual.getValue().validate(); + +//date time x-edge-location sc-bytes c-ip cs-method cs(Host) cs-uri-stem sc-status cs(Referer) cs(User-Agent) cs-uri-query cs(Cookie) x-edge-result-type x-edge-request-id x-host-header cs-protocol cs-bytes time-taken + + + } + + +} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SpoolDirELFSourceTaskTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SpoolDirELFSourceTaskTest.java new file mode 100644 index 0000000..28b5771 --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/spooldir/elf/SpoolDirELFSourceTaskTest.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.spooldir.elf; + +import org.junit.jupiter.api.Test; + +public class SpoolDirELFSourceTaskTest { + + @Test + public void foo() { + + + + } + + + +} diff --git a/src/test/resources/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnector/tsv.json b/src/test/resources/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnector/tsv.json index f3e767f..ecae79d 100644 --- a/src/test/resources/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnector/tsv.json +++ b/src/test/resources/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnector/tsv.json @@ -1,6 +1,6 @@ { - "name": "TSV", - "description": "This example will read a TSV", + "name": "TSV input file", + "description": "This example will read a tab separated file. This method is very similar to reading a standard CSV file.", "config": { "finished.path": "/tmp", "input.path": "/tmp",