From ac8c4e452402c16b4f25f31d357d8e2ba7bf0cad Mon Sep 17 00:00:00 2001 From: Jeremy Custenborder Date: Fri, 20 Apr 2018 16:38:03 -0500 Subject: [PATCH] Cleanup of configuration documentation. --- .../spooldir/SpoolDirCsvSourceConnector.java | 2 + .../SpoolDirCsvSourceConnectorConfig.java | 135 ++++++++-- .../spooldir/SpoolDirJsonSourceConnector.java | 2 + .../SpoolDirSourceConnectorConfig.java | 238 ++++++++++++++++-- .../kafka/connect/spooldir/package-info.java | 30 +++ 5 files changed, 368 insertions(+), 39 deletions(-) create mode 100644 src/main/java/com/github/jcustenborder/kafka/connect/spooldir/package-info.java diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnector.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnector.java index bcc0969..dc9d580 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnector.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnector.java @@ -16,11 +16,13 @@ package com.github.jcustenborder.kafka.connect.spooldir; import com.github.jcustenborder.kafka.connect.utils.config.Description; +import com.github.jcustenborder.kafka.connect.utils.config.Title; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import java.util.Map; +@Title("CSV Source Connector") @Description("The SpoolDirCsvSourceConnector will monitor the directory specified in `input.path` for files and read them as a CSV " + "converting each of the records to the strongly typed equivalent specified in `key.schema` and `value.schema`.") public class SpoolDirCsvSourceConnector extends SpoolDirSourceConnector { diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnectorConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnectorConfig.java index 2dd953f..eeedd11 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnectorConfig.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirCsvSourceConnectorConfig.java @@ -15,6 +15,7 @@ */ package com.github.jcustenborder.kafka.connect.spooldir; +import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder; import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils; import com.github.jcustenborder.kafka.connect.utils.config.ValidEnum; import com.google.common.base.Joiner; @@ -77,7 +78,7 @@ class SpoolDirCsvSourceConnectorConfig extends SpoolDirSourceConnectorConfig { static final String CSV_CHARSET_DEFAULT = Charset.defaultCharset().name(); static final String CSV_CASE_SENSITIVE_FIELD_NAMES_DOC = "Flag to determine if the field names in the header row should be treated as case sensitive."; - static final String CSV_GROUP = "csv"; + static final String CSV_GROUP = "CSV Parsing"; static final String CSV_DISPLAY_NAME = "CSV Settings"; private static final String CSV_QUOTE_CHAR_DOC = "The character that is used to quote a field. This typically happens when the " + CSV_SEPARATOR_CHAR_CONF + " character is within the data."; public final int skipLines; @@ -115,22 +116,126 @@ public SpoolDirCsvSourceConnectorConfig(final boolean isTask, Map set } static final ConfigDef conf() { - int csvPosition = 0; return SpoolDirSourceConnectorConfig.config() - .define(CSV_SKIP_LINES_CONF, ConfigDef.Type.INT, CSV_SKIP_LINES_DEFAULT, ConfigDef.Importance.LOW, CSV_SKIP_LINES_DOC, CSV_GROUP, csvPosition++, ConfigDef.Width.LONG, CSV_DISPLAY_NAME) - .define(CSV_SEPARATOR_CHAR_CONF, ConfigDef.Type.INT, CSV_SEPARATOR_CHAR_DEFAULT, ConfigDef.Importance.LOW, CSV_SEPARATOR_CHAR_DOC, CSV_GROUP, csvPosition++, ConfigDef.Width.LONG, CSV_DISPLAY_NAME) - .define(CSV_QUOTE_CHAR_CONF, ConfigDef.Type.INT, CSV_QUOTE_CHAR_DEFAULT, ConfigDef.Importance.LOW, CSV_QUOTE_CHAR_DOC, CSV_GROUP, csvPosition++, ConfigDef.Width.LONG, CSV_DISPLAY_NAME) - .define(CSV_ESCAPE_CHAR_CONF, ConfigDef.Type.INT, CSV_ESCAPE_CHAR_DEFAULT, ConfigDef.Importance.LOW, CSV_ESCAPE_CHAR_DOC, CSV_GROUP, csvPosition++, ConfigDef.Width.LONG, CSV_DISPLAY_NAME) - .define(CSV_STRICT_QUOTES_CONF, ConfigDef.Type.BOOLEAN, CSV_STRICT_QUOTES_DEFAULT, ConfigDef.Importance.LOW, CSV_STRICT_QUOTES_DOC, CSV_GROUP, csvPosition++, ConfigDef.Width.LONG, CSV_DISPLAY_NAME) - .define(CSV_IGNORE_LEADING_WHITESPACE_CONF, ConfigDef.Type.BOOLEAN, CSV_IGNORE_LEADING_WHITESPACE_DEFAULT, ConfigDef.Importance.LOW, CSV_IGNORE_LEADING_WHITESPACE_DOC, CSV_GROUP, csvPosition++, ConfigDef.Width.LONG, CSV_DISPLAY_NAME) - .define(CSV_IGNORE_QUOTATIONS_CONF, ConfigDef.Type.BOOLEAN, CSV_IGNORE_QUOTATIONS_DEFAULT, ConfigDef.Importance.LOW, CSV_IGNORE_QUOTATIONS_DOC, CSV_GROUP, csvPosition++, ConfigDef.Width.LONG, CSV_DISPLAY_NAME) - .define(CSV_KEEP_CARRIAGE_RETURN_CONF, ConfigDef.Type.BOOLEAN, CSV_KEEP_CARRIAGE_RETURN_DEFAULT, ConfigDef.Importance.LOW, CSV_KEEP_CARRIAGE_RETURN_DOC, CSV_GROUP, csvPosition++, ConfigDef.Width.LONG, CSV_DISPLAY_NAME) - .define(CSV_VERIFY_READER_CONF, ConfigDef.Type.BOOLEAN, CSV_VERIFY_READER_DEFAULT, ConfigDef.Importance.LOW, CSV_VERIFY_READER_DOC, CSV_GROUP, csvPosition++, ConfigDef.Width.LONG, CSV_DISPLAY_NAME) - .define(CSV_NULL_FIELD_INDICATOR_CONF, ConfigDef.Type.STRING, CSV_NULL_FIELD_INDICATOR_DEFAULT, ValidEnum.of(CSVReaderNullFieldIndicator.class), ConfigDef.Importance.LOW, CSV_NULL_FIELD_INDICATOR_DOC, CSV_GROUP, csvPosition++, ConfigDef.Width.LONG, CSV_DISPLAY_NAME) - .define(CSV_FIRST_ROW_AS_HEADER_CONF, ConfigDef.Type.BOOLEAN, CSV_FIRST_ROW_AS_HEADER_DEFAULT, ConfigDef.Importance.MEDIUM, CSV_FIRST_ROW_AS_HEADER_DOC, CSV_GROUP, csvPosition++, ConfigDef.Width.LONG, CSV_DISPLAY_NAME) - .define(CSV_CHARSET_CONF, ConfigDef.Type.STRING, CSV_CHARSET_DEFAULT, CharsetValidator.of(), ConfigDef.Importance.LOW, CSV_CHARSET_DOC, CSV_GROUP, csvPosition++, ConfigDef.Width.LONG, CSV_DISPLAY_NAME) - .define(CSV_CASE_SENSITIVE_FIELD_NAMES_CONF, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, CSV_CASE_SENSITIVE_FIELD_NAMES_DOC); + .define( + ConfigKeyBuilder.of(CSV_SKIP_LINES_CONF, ConfigDef.Type.INT) + .defaultValue(CSV_SKIP_LINES_DEFAULT) + .importance(ConfigDef.Importance.LOW) + .documentation(CSV_SKIP_LINES_DOC) + .group(CSV_GROUP) + .width(ConfigDef.Width.LONG) + .displayName(CSV_DISPLAY_NAME) + .build() + ) + .define( + ConfigKeyBuilder.of(CSV_SEPARATOR_CHAR_CONF, ConfigDef.Type.INT) + .defaultValue(CSV_SEPARATOR_CHAR_DEFAULT) + .importance(ConfigDef.Importance.LOW) + .documentation(CSV_SEPARATOR_CHAR_DOC) + .group(CSV_GROUP) + .width(ConfigDef.Width.LONG) + .build() + ) + .define( + ConfigKeyBuilder.of(CSV_QUOTE_CHAR_CONF, ConfigDef.Type.INT) + .defaultValue(CSV_QUOTE_CHAR_DEFAULT) + .importance(ConfigDef.Importance.LOW) + .documentation(CSV_QUOTE_CHAR_DOC) + .group(CSV_GROUP) + .width(ConfigDef.Width.LONG) + .build() + ) + .define( + ConfigKeyBuilder.of(CSV_ESCAPE_CHAR_CONF, ConfigDef.Type.INT) + .defaultValue(CSV_ESCAPE_CHAR_DEFAULT) + .importance(ConfigDef.Importance.LOW) + .documentation(CSV_ESCAPE_CHAR_DOC) + .group(CSV_GROUP) + .width(ConfigDef.Width.LONG) + .build() + ) + .define( + ConfigKeyBuilder.of(CSV_STRICT_QUOTES_CONF, ConfigDef.Type.BOOLEAN) + .defaultValue(CSV_STRICT_QUOTES_DEFAULT) + .importance(ConfigDef.Importance.LOW) + .documentation(CSV_STRICT_QUOTES_DOC) + .group(CSV_GROUP) + .width(ConfigDef.Width.LONG) + .build() + ) + .define( + ConfigKeyBuilder.of(CSV_IGNORE_LEADING_WHITESPACE_CONF, ConfigDef.Type.BOOLEAN) + .defaultValue(CSV_IGNORE_LEADING_WHITESPACE_DEFAULT) + .importance(ConfigDef.Importance.LOW) + .documentation(CSV_IGNORE_LEADING_WHITESPACE_DOC) + .group(CSV_GROUP) + .width(ConfigDef.Width.LONG) + .build() + ) + .define( + ConfigKeyBuilder.of(CSV_IGNORE_QUOTATIONS_CONF, ConfigDef.Type.BOOLEAN) + .defaultValue(CSV_IGNORE_QUOTATIONS_DEFAULT) + .importance(ConfigDef.Importance.LOW) + .documentation(CSV_IGNORE_QUOTATIONS_DOC) + .group(CSV_GROUP) + .width(ConfigDef.Width.LONG) + .build() + ) + .define( + ConfigKeyBuilder.of(CSV_KEEP_CARRIAGE_RETURN_CONF, ConfigDef.Type.BOOLEAN) + .defaultValue(CSV_KEEP_CARRIAGE_RETURN_DEFAULT) + .importance(ConfigDef.Importance.LOW) + .documentation(CSV_KEEP_CARRIAGE_RETURN_DOC) + .group(CSV_GROUP) + .width(ConfigDef.Width.LONG) + .build() + ) + .define( + ConfigKeyBuilder.of(CSV_VERIFY_READER_CONF, ConfigDef.Type.BOOLEAN) + .defaultValue(CSV_VERIFY_READER_DEFAULT) + .importance(ConfigDef.Importance.LOW) + .documentation(CSV_VERIFY_READER_DOC) + .group(CSV_GROUP) + .width(ConfigDef.Width.LONG) + .build() + ) + .define( + ConfigKeyBuilder.of(CSV_NULL_FIELD_INDICATOR_CONF, ConfigDef.Type.STRING) + .defaultValue(CSV_NULL_FIELD_INDICATOR_DEFAULT) + .validator(ValidEnum.of(CSVReaderNullFieldIndicator.class)) + .importance(ConfigDef.Importance.LOW) + .documentation(CSV_NULL_FIELD_INDICATOR_DOC) + .group(CSV_GROUP) + .width(ConfigDef.Width.LONG) + .build() + ) + .define( + ConfigKeyBuilder.of(CSV_FIRST_ROW_AS_HEADER_CONF, ConfigDef.Type.BOOLEAN) + .defaultValue(CSV_FIRST_ROW_AS_HEADER_DEFAULT) + .importance(ConfigDef.Importance.MEDIUM) + .documentation(CSV_FIRST_ROW_AS_HEADER_DOC) + .group(CSV_GROUP) + .width(ConfigDef.Width.LONG) + .build() + ) + .define( + ConfigKeyBuilder.of(CSV_CHARSET_CONF, ConfigDef.Type.STRING) + .defaultValue(CSV_CHARSET_DEFAULT) + .validator(CharsetValidator.of()) + .importance(ConfigDef.Importance.LOW) + .documentation(CSV_CHARSET_DOC) + .group(CSV_GROUP) + .width(ConfigDef.Width.LONG) + .build() + ) + .define( + ConfigKeyBuilder.of(CSV_CASE_SENSITIVE_FIELD_NAMES_CONF, ConfigDef.Type.BOOLEAN) + .defaultValue(false) + .importance(ConfigDef.Importance.LOW) + .documentation(CSV_CASE_SENSITIVE_FIELD_NAMES_DOC) + .build() + ); } final char getChar(String key) { diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceConnector.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceConnector.java index 01acc9b..5d46634 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceConnector.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirJsonSourceConnector.java @@ -16,11 +16,13 @@ package com.github.jcustenborder.kafka.connect.spooldir; import com.github.jcustenborder.kafka.connect.utils.config.Description; +import com.github.jcustenborder.kafka.connect.utils.config.Title; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import java.util.Map; +@Title("Json Source Connector") @Description("This connector is used to `stream ` JSON files from a directory " + "while converting the data based on the schema supplied in the configuration.") public class SpoolDirJsonSourceConnector extends SpoolDirSourceConnector { diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceConnectorConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceConnectorConfig.java index 8a54d30..77bf7a8 100644 --- a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceConnectorConfig.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SpoolDirSourceConnectorConfig.java @@ -16,6 +16,7 @@ package com.github.jcustenborder.kafka.connect.spooldir; import com.fasterxml.jackson.core.JsonProcessingException; +import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder; import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils; import com.github.jcustenborder.kafka.connect.utils.config.ValidEnum; import com.github.jcustenborder.kafka.connect.utils.config.ValidPattern; @@ -23,6 +24,7 @@ import com.github.jcustenborder.kafka.connect.utils.jackson.ObjectMapperFactory; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; import com.google.common.io.PatternFilenameFilter; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -282,32 +284,220 @@ private static final Field findMetadataField(Schema schema) { return result; } + public static final String GROUP_FILESYSTEM = "File System"; + public static final String GROUP_SCHEMA_GENERATION = "Schema Generation"; + public static final String GROUP_SCHEMA = "Schema"; + public static final String GROUP_GENERAL = "General"; + public static final String GROUP_TIMESTAMP = "Timestamps"; + public static ConfigDef config() { + + ConfigDef.Recommender schemaRecommender = new ConfigDef.Recommender() { + @Override + public List validValues(String key, Map settings) { + return ImmutableList.of(); + } + + @Override + public boolean visible(String key, Map settings) { + boolean schemaGenerationEnabled = (boolean) settings.get(SCHEMA_GENERATION_ENABLED_CONF); + + if (KEY_SCHEMA_CONF.endsWith(key)) { + return !schemaGenerationEnabled; + } + if (VALUE_SCHEMA_CONF.endsWith(key)) { + return !schemaGenerationEnabled; + } + if (SCHEMA_GENERATION_KEY_NAME_CONF.endsWith(key)) { + return schemaGenerationEnabled; + } + if (SCHEMA_GENERATION_VALUE_NAME_CONF.endsWith(key)) { + return schemaGenerationEnabled; + } + if (SCHEMA_GENERATION_KEY_FIELDS_CONF.endsWith(key)) { + return schemaGenerationEnabled; + } + + return true; + } + }; + + return new ConfigDef() - //PollingDirectoryMonitorConfig - .define(INPUT_PATH_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ValidDirectoryWritable.of(), ConfigDef.Importance.HIGH, INPUT_PATH_DOC) - .define(FINISHED_PATH_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ValidDirectoryWritable.of(), ConfigDef.Importance.HIGH, FINISHED_PATH_DOC) - .define(ERROR_PATH_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ValidDirectoryWritable.of(), ConfigDef.Importance.HIGH, ERROR_PATH_DOC) - .define(INPUT_FILE_PATTERN_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, INPUT_FILE_PATTERN_DOC) - .define(HALT_ON_ERROR_CONF, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.HIGH, HALT_ON_ERROR_DOC) - .define(FILE_MINIMUM_AGE_MS_CONF, ConfigDef.Type.LONG, 0L, ConfigDef.Range.between(0L, Long.MAX_VALUE), ConfigDef.Importance.LOW, FILE_MINIMUM_AGE_MS_DOC) - .define(PROCESSING_FILE_EXTENSION_CONF, ConfigDef.Type.STRING, PROCESSING_FILE_EXTENSION_DEFAULT, ValidPattern.of("^.*\\..+$"), ConfigDef.Importance.LOW, PROCESSING_FILE_EXTENSION_DOC) - - .define(BATCH_SIZE_CONF, ConfigDef.Type.INT, BATCH_SIZE_DEFAULT, ConfigDef.Importance.LOW, BATCH_SIZE_DOC) - .define(TOPIC_CONF, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC) - - .define(KEY_SCHEMA_CONF, Type.STRING, "", ConfigDef.Importance.HIGH, KEY_SCHEMA_DOC) - .define(VALUE_SCHEMA_CONF, Type.STRING, "", ConfigDef.Importance.HIGH, VALUE_SCHEMA_DOC) - .define(PARSER_TIMESTAMP_TIMEZONE_CONF, ConfigDef.Type.STRING, PARSER_TIMESTAMP_TIMEZONE_DEFAULT, ConfigDef.Importance.LOW, PARSER_TIMESTAMP_TIMEZONE_DOC) - .define(PARSER_TIMESTAMP_DATE_FORMATS_CONF, ConfigDef.Type.LIST, PARSER_TIMESTAMP_DATE_FORMATS_DEFAULT, ConfigDef.Importance.LOW, PARSER_TIMESTAMP_DATE_FORMATS_DOC) - - .define(EMPTY_POLL_WAIT_MS_CONF, ConfigDef.Type.LONG, 1000L, ConfigDef.Range.between(1L, Long.MAX_VALUE), ConfigDef.Importance.LOW, EMPTY_POLL_WAIT_MS_DOC) - .define(TIMESTAMP_MODE_CONF, Type.STRING, TimestampMode.PROCESS_TIME.toString(), ValidEnum.of(TimestampMode.class), ConfigDef.Importance.MEDIUM, TIMESTAMP_MODE_DOC) - .define(TIMESTAMP_FIELD_CONF, Type.STRING, "", ConfigDef.Importance.MEDIUM, TIMESTAMP_FIELD_DOC) - .define(SCHEMA_GENERATION_KEY_FIELDS_CONF, Type.LIST, new ArrayList<>(), ConfigDef.Importance.MEDIUM, SCHEMA_GENERATION_KEY_FIELDS_DOC) - .define(SCHEMA_GENERATION_ENABLED_CONF, Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, SCHEMA_GENERATION_ENABLED_DOC) - .define(SCHEMA_GENERATION_KEY_NAME_CONF, Type.STRING, "com.github.jcustenborder.kafka.connect.model.Key", ConfigDef.Importance.MEDIUM, SCHEMA_GENERATION_KEY_NAME_DOC) - .define(SCHEMA_GENERATION_VALUE_NAME_CONF, Type.STRING, "com.github.jcustenborder.kafka.connect.model.Value", ConfigDef.Importance.MEDIUM, SCHEMA_GENERATION_VALUE_NAME_DOC); + + + .define( + ConfigKeyBuilder.of(TOPIC_CONF, Type.STRING) + .documentation(TOPIC_DOC) + .group(GROUP_GENERAL) + .importance(ConfigDef.Importance.HIGH) + .build() + ).define( + ConfigKeyBuilder.of(BATCH_SIZE_CONF, Type.INT) + .documentation(BATCH_SIZE_DOC) + .importance(ConfigDef.Importance.LOW) + .defaultValue(BATCH_SIZE_DEFAULT) + .group(GROUP_GENERAL) + .build() + ).define( + ConfigKeyBuilder.of(EMPTY_POLL_WAIT_MS_CONF, Type.LONG) + .documentation(EMPTY_POLL_WAIT_MS_DOC) + .importance(ConfigDef.Importance.LOW) + .defaultValue(1000L) + .validator(ConfigDef.Range.between(1L, Long.MAX_VALUE)) + .group(GROUP_GENERAL) + .build() + ) + + // Filesystem + .define( + ConfigKeyBuilder.of(INPUT_PATH_CONFIG, ConfigDef.Type.STRING) + .documentation(INPUT_PATH_DOC) + .importance(ConfigDef.Importance.HIGH) + .validator(ValidDirectoryWritable.of()) + .group(GROUP_FILESYSTEM) + .build() + ).define( + ConfigKeyBuilder.of(FINISHED_PATH_CONFIG, ConfigDef.Type.STRING) + .documentation(FINISHED_PATH_DOC) + .importance(ConfigDef.Importance.HIGH) + .validator(ValidDirectoryWritable.of()) + .group(GROUP_FILESYSTEM) + .build() + ).define( + ConfigKeyBuilder.of(ERROR_PATH_CONFIG, ConfigDef.Type.STRING) + .documentation(ERROR_PATH_DOC) + .importance(ConfigDef.Importance.HIGH) + .validator(ValidDirectoryWritable.of()) + .group(GROUP_FILESYSTEM) + .build() + ).define( + ConfigKeyBuilder.of(INPUT_FILE_PATTERN_CONF, ConfigDef.Type.STRING) + .documentation(INPUT_FILE_PATTERN_DOC) + .importance(ConfigDef.Importance.HIGH) + .group(GROUP_FILESYSTEM) + .build() + ).define( + ConfigKeyBuilder.of(HALT_ON_ERROR_CONF, Type.BOOLEAN) + .documentation(HALT_ON_ERROR_DOC) + .importance(ConfigDef.Importance.HIGH) + .defaultValue(true) + .group(GROUP_FILESYSTEM) + .build() + ).define( + ConfigKeyBuilder.of(FILE_MINIMUM_AGE_MS_CONF, Type.LONG) + .documentation(FILE_MINIMUM_AGE_MS_DOC) + .importance(ConfigDef.Importance.LOW) + .group(GROUP_FILESYSTEM) + .defaultValue(0L) + .validator(ConfigDef.Range.atLeast(0L)) + .build() + ).define( + ConfigKeyBuilder.of(PROCESSING_FILE_EXTENSION_CONF, Type.STRING) + .documentation(PROCESSING_FILE_EXTENSION_DOC) + .importance(ConfigDef.Importance.LOW) + .validator(ValidDirectoryWritable.of()) + .group(GROUP_FILESYSTEM) + .defaultValue(PROCESSING_FILE_EXTENSION_DEFAULT) + .validator(ValidPattern.of("^.*\\..+$")) + .build() + ) + + .define( + ConfigKeyBuilder.of(KEY_SCHEMA_CONF, Type.STRING) + .documentation(KEY_SCHEMA_DOC) + .importance(ConfigDef.Importance.HIGH) + .group(GROUP_SCHEMA) + .defaultValue("") + .width(ConfigDef.Width.LONG) + .build() + ).define( + ConfigKeyBuilder.of(VALUE_SCHEMA_CONF, Type.STRING) + .documentation(VALUE_SCHEMA_DOC) + .importance(ConfigDef.Importance.HIGH) + .group(GROUP_SCHEMA) + .defaultValue("") + .width(ConfigDef.Width.LONG) + .build() + ) + + .define( + ConfigKeyBuilder.of(SCHEMA_GENERATION_ENABLED_CONF, Type.BOOLEAN) + .documentation(SCHEMA_GENERATION_ENABLED_DOC) + .importance(ConfigDef.Importance.MEDIUM) + .group(GROUP_SCHEMA_GENERATION) + .defaultValue(false) + .recommender(schemaRecommender) + .build() + ) + .define( + ConfigKeyBuilder.of(SCHEMA_GENERATION_KEY_FIELDS_CONF, Type.LIST) + .documentation(SCHEMA_GENERATION_KEY_FIELDS_DOC) + .importance(ConfigDef.Importance.MEDIUM) + .group(GROUP_SCHEMA_GENERATION) + .defaultValue(ImmutableList.of()) + .recommender(schemaRecommender) + .build() + ).define( + ConfigKeyBuilder.of(SCHEMA_GENERATION_KEY_NAME_CONF, Type.STRING) + .documentation(SCHEMA_GENERATION_KEY_NAME_DOC) + .importance(ConfigDef.Importance.MEDIUM) + .group(GROUP_SCHEMA_GENERATION) + .defaultValue("com.github.jcustenborder.kafka.connect.model.Key") + .recommender(schemaRecommender) + .build() + ).define( + ConfigKeyBuilder.of(SCHEMA_GENERATION_VALUE_NAME_CONF, Type.STRING) + .documentation(SCHEMA_GENERATION_VALUE_NAME_DOC) + .importance(ConfigDef.Importance.MEDIUM) + .group(GROUP_SCHEMA_GENERATION) + .defaultValue("com.github.jcustenborder.kafka.connect.model.Value") + .recommender(schemaRecommender) + .build() + ) + + .define( + ConfigKeyBuilder.of(PARSER_TIMESTAMP_TIMEZONE_CONF, Type.STRING) + .documentation(PARSER_TIMESTAMP_TIMEZONE_DOC) + .importance(ConfigDef.Importance.LOW) + .group(GROUP_TIMESTAMP) + .defaultValue(PARSER_TIMESTAMP_TIMEZONE_DEFAULT) + .build() + ).define( + ConfigKeyBuilder.of(PARSER_TIMESTAMP_DATE_FORMATS_CONF, Type.LIST) + .documentation(PARSER_TIMESTAMP_DATE_FORMATS_DOC) + .importance(ConfigDef.Importance.LOW) + .group(GROUP_TIMESTAMP) + .defaultValue(PARSER_TIMESTAMP_DATE_FORMATS_DEFAULT) + .build() + ).define( + ConfigKeyBuilder.of(TIMESTAMP_MODE_CONF, Type.STRING) + .documentation(TIMESTAMP_MODE_DOC) + .importance(ConfigDef.Importance.MEDIUM) + .group(GROUP_TIMESTAMP) + .defaultValue(TimestampMode.PROCESS_TIME.toString()) + .validator(ValidEnum.of(TimestampMode.class)) + .build() + ).define( + ConfigKeyBuilder.of(TIMESTAMP_FIELD_CONF, Type.STRING) + .documentation(TIMESTAMP_FIELD_DOC) + .importance(ConfigDef.Importance.MEDIUM) + .group(GROUP_TIMESTAMP) + .defaultValue("") + .recommender(new ConfigDef.Recommender() { + @Override + public List validValues(String key, Map settings) { + return ImmutableList.of(); + } + + @Override + public boolean visible(String key, Map settings) { + String timestampMode = (String) settings.get(TIMESTAMP_MODE_CONF); + return TimestampMode.FIELD.toString().equals(timestampMode); + } + }) + .build() + ); } Schema readSchema(final String key) { diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/package-info.java b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/package-info.java new file mode 100644 index 0000000..7c3285c --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/spooldir/package-info.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@Introduction("\n" + + "This Kafka Connect connector provides the capability to watch a directory for files and " + + "read the data as new files are written to the input directory. Each of the records in the " + + "input file will be converted based on the user supplied schema.\n" + + "\n" + + "The CSVRecordProcessor supports reading CSV or TSV files. It can convert a CSV on the fly " + + "to the strongly typed Kafka Connect data types. It currently has support for all of the " + + "schema types and logical types that are supported in Kafka Connect. If you couple this " + + "with the Avro converter and Schema Registry by Confluent, you will be able to process " + + "CSV, Json, or TSV files to strongly typed Avro data in real time.") + @Title("File Input") +package com.github.jcustenborder.kafka.connect.spooldir; + +import com.github.jcustenborder.kafka.connect.utils.config.Introduction; +import com.github.jcustenborder.kafka.connect.utils.config.Title; \ No newline at end of file