From 69e5686800476ac384f9f9205ac7cf8e84422533 Mon Sep 17 00:00:00 2001 From: Brian O'Neill Date: Tue, 10 Oct 2017 21:51:16 -0400 Subject: [PATCH 1/4] scaffolding --- build.sbt | 21 +- .../tranquility/kinesis/KinesisConsumer.java | 189 ++++++++++ .../tranquility/kinesis/KinesisMain.java | 111 ++++++ .../kinesis/model/MessageCounters.java | 81 +++++ .../model/PropertiesBasedKinesisConfig.java | 78 ++++ .../writer/TranquilityEventWriter.java | 141 ++++++++ .../kinesis/writer/WriterController.java | 174 +++++++++ .../tranquility/kafka/KafkaBeamUtils.scala | 51 +++ .../tranquility/kafka/KafkaConsumerTest.java | 342 ++++++++++++++++++ .../kafka/writer/WriterControllerTest.java | 207 +++++++++++ 10 files changed, 1394 insertions(+), 1 deletion(-) create mode 100644 kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisConsumer.java create mode 100644 kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisMain.java create mode 100644 kinesis/src/main/java/com/metamx/tranquility/kinesis/model/MessageCounters.java create mode 100644 kinesis/src/main/java/com/metamx/tranquility/kinesis/model/PropertiesBasedKinesisConfig.java create mode 100644 kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/TranquilityEventWriter.java create mode 100644 kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/WriterController.java create mode 100644 kinesis/src/main/scala/com/metamx/tranquility/kafka/KafkaBeamUtils.scala create mode 100644 kinesis/src/test/java/com/metamx/tranquility/kafka/KafkaConsumerTest.java create mode 100644 kinesis/src/test/java/com/metamx/tranquility/kafka/writer/WriterControllerTest.java diff --git a/build.sbt b/build.sbt index 521f729..b336413 100644 --- a/build.sbt +++ b/build.sbt @@ -136,6 +136,14 @@ val kafkaDependencies = Seq( "io.airlift" % "airline" % airlineVersion ) ++ loggingDependencies +val kinesisDependencies = Seq( + "org.apache.kafka" %% "kafka" % kafkaVersion + exclude("org.slf4j", "slf4j-log4j12") + exclude("log4j", "log4j") + force(), + "io.airlift" % "airline" % airlineVersion +) ++ loggingDependencies + val coreTestDependencies = Seq( "org.scalatest" %% "scalatest" % "2.2.5" % "test", dependOnDruid("druid-services") % "test", @@ -175,6 +183,10 @@ val kafkaTestDependencies = Seq( "org.easymock" % "easymock" % "3.4" % "test" ) +val kinesisTestDependencies = Seq( + "org.easymock" % "easymock" % "3.4" % "test" +) + lazy val commonSettings = Seq( organization := "io.druid", @@ -266,6 +278,13 @@ lazy val kafka = project.in(file("kafka")) .settings(publishArtifact in Test := false) .dependsOn(core % "test->test;compile->compile") +lazy val kinesis = project.in(file("kinesis")) + .settings(commonSettings: _*) + .settings(name := "tranquility-kinesis") + .settings(libraryDependencies ++= (kinesisDependencies ++ kinesisTestDependencies)) + .settings(publishArtifact in Test := false) + .dependsOn(core % "test->test;compile->compile") + lazy val distribution = project.in(file("distribution")) .settings(commonSettings: _*) .settings(name := "tranquility-distribution") @@ -274,4 +293,4 @@ lazy val distribution = project.in(file("distribution")) .settings(executableScriptName := "tranquility") .settings(bashScriptExtraDefines += """addJava "-Dlogback.configurationFile=${app_home}/../conf/logback.xml"""") .enablePlugins(JavaAppPackaging) - .dependsOn(kafka, server) + .dependsOn(kafka, kinesis, server) diff --git a/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisConsumer.java b/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisConsumer.java new file mode 100644 index 0000000..6212ec0 --- /dev/null +++ b/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisConsumer.java @@ -0,0 +1,189 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.metamx.tranquility.kinesis; + +import com.google.common.base.Throwables; +import com.metamx.common.logger.Logger; +import com.metamx.tranquility.config.DataSourceConfig; +import com.metamx.tranquility.kafka.model.MessageCounters; +import com.metamx.tranquility.kinesis.model.PropertiesBasedKinesisConfig; +import com.metamx.tranquility.kafka.writer.WriterController; +import io.druid.concurrent.Execs; + + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Spawns a number of threads to read messages from Kafka topics and write them by calling + * WriterController.getWriter(topic).send(). Will periodically call WriterController.flushAll() and when this completes + * will call ConsumerConnector.commitOffsets() to save the last written offset to ZooKeeper. This implementation + * guarantees that any events in Kafka will be read at least once even in case of a failure condition but does not + * guarantee that duplication will not occur. + */ +public class KinesisConsumer +{ + private static final Logger log = new Logger(KinesisConsumer.class); + + private final ExecutorService consumerExec; + private final Thread commitThread; + private final AtomicBoolean shutdown = new AtomicBoolean(); + + // prevents reading the next event from Kafka while events are being flushed and offset is being committed to ZK + private final ReentrantReadWriteLock commitLock = new ReentrantReadWriteLock(); + + private final int numThreads; + private final int commitMillis; + private final WriterController writerController; + + private Map previousMessageCounters = new HashMap<>(); + + public KinesisConsumer( + final PropertiesBasedKinesisConfig globalConfig, + final Properties kinesisProperties, + final Map> dataSourceConfigs, + final WriterController writerController + ) + { + int defaultNumThreads = Math.max(1, Runtime.getRuntime().availableProcessors() - 1); + this.numThreads = globalConfig.getConsumerNumThreads() > 0 + ? globalConfig.getConsumerNumThreads() + : defaultNumThreads; + + this.commitMillis = globalConfig.getCommitPeriodMillis(); + this.writerController = writerController; + this.consumerExec = Execs.multiThreaded(numThreads, "KafkaConsumer-%d"); + this.commitThread = new Thread(createCommitRunnable()); + this.commitThread.setName("KafkaConsumer-CommitThread"); + this.commitThread.setDaemon(true); + } + + public void start() + { + commitThread.start(); + // startConsumers(); + } + + public void stop() + { + if (shutdown.compareAndSet(false, true)) { + log.info("Shutting down - attempting to flush buffers and commit final offsets"); + + try { + commitLock.writeLock().lockInterruptibly(); // prevent Kafka from consuming any more events + try { + writerController.flushAll(); // try to flush the remaining events to Druid + writerController.stop(); + } + finally { + commitLock.writeLock().unlock(); + commitThread.interrupt(); + consumerExec.shutdownNow(); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + Throwables.propagate(e); + } + + log.info("Finished clean shutdown."); + } + } + + public void join() throws InterruptedException + { + commitThread.join(); + } + + void commit() throws InterruptedException + { + commitLock.writeLock().lockInterruptibly(); + try { + final long flushStartTime = System.currentTimeMillis(); + final Map messageCounters = writerController.flushAll(); // blocks until complete + + final long commitStartTime = System.currentTimeMillis(); + + final long finishedTime = System.currentTimeMillis(); + Map countsSinceLastCommit = new HashMap(); + for (Map.Entry entry : messageCounters.entrySet()) { + countsSinceLastCommit.put( + entry.getKey(), + entry.getValue().difference(previousMessageCounters.get(entry.getKey())) + ); + } + + previousMessageCounters = messageCounters; + + log.info( + "Flushed %s pending messages in %sms and committed offsets in %sms.", + countsSinceLastCommit.isEmpty() ? "0" : countsSinceLastCommit, + commitStartTime - flushStartTime, + finishedTime - commitStartTime + ); + } + finally { + commitLock.writeLock().unlock(); + } + } + + private Runnable createCommitRunnable() + { + return new Runnable() + { + @Override + public void run() + { + long lastFlushTime = System.currentTimeMillis(); + try { + while (!Thread.currentThread().isInterrupted()) { + Thread.sleep(Math.max(commitMillis - (System.currentTimeMillis() - lastFlushTime), 0)); + commit(); + lastFlushTime = System.currentTimeMillis(); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.info("Commit thread interrupted."); + } + catch (Throwable e) { + log.error(e, "Commit thread failed!"); + throw Throwables.propagate(e); + } + finally { + stop(); + } + } + }; + } + + private static String buildTopicFilter(Map> dataSourceConfigs) + { + StringBuilder topicFilter = new StringBuilder(); + for (Map.Entry> entry : dataSourceConfigs.entrySet()) { + topicFilter.append(String.format("(%s)|", entry.getValue().propertiesBasedConfig().getTopicPattern())); + } + + return topicFilter.length() > 0 ? topicFilter.substring(0, topicFilter.length() - 1) : ""; + } +} diff --git a/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisMain.java b/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisMain.java new file mode 100644 index 0000000..a36c819 --- /dev/null +++ b/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisMain.java @@ -0,0 +1,111 @@ +package com.metamx.tranquility.kinesis; + +import com.google.common.base.Throwables; +import com.google.common.collect.Maps; +import com.metamx.common.logger.Logger; +import com.metamx.tranquility.config.DataSourceConfig; +import com.metamx.tranquility.config.TranquilityConfig; +import com.metamx.tranquility.kinesis.model.PropertiesBasedKinesisConfig; +import io.airlift.airline.Help; +import io.airlift.airline.HelpOption; +import io.airlift.airline.Option; +import io.airlift.airline.SingleCommand; +import javax.inject.Inject; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.Properties; + +public class KinesisMain +{ + + private static final Logger log = new Logger(KinesisMain.class); + + @Inject + public HelpOption helpOption; + + @Option(name = {"-f", "-configFile"}, description = "Path to configuration property file") + public String propertiesFile; + + public static void main(String[] args) throws Exception + { + KinesisMain main; + try { + main = SingleCommand.singleCommand(KinesisMain.class).parse(args); + } + catch (Exception e) { + log.error(e, "Exception parsing arguments"); + Help.help(SingleCommand.singleCommand(KinesisMain.class).getCommandMetadata()); + return; + } + + if (main.helpOption.showHelpIfRequested()) { + return; + } + + main.run(); + } + + public void run() throws InterruptedException + { + if (propertiesFile == null || propertiesFile.isEmpty()) { + helpOption.help = true; + helpOption.showHelpIfRequested(); + + log.warn("Missing required parameters, aborting."); + return; + } + + TranquilityConfig config = null; + try (InputStream in = new FileInputStream(propertiesFile)) { + config = TranquilityConfig.read(in, PropertiesBasedKinesisConfig.class); + } + catch (IOException e) { + log.error("Could not read config file: %s, aborting.", propertiesFile); + Throwables.propagate(e); + } + + PropertiesBasedKinesisConfig globalConfig = config.globalConfig(); + Map> dataSourceConfigs = Maps.newHashMap(); + for (String dataSource : config.getDataSources()) { + dataSourceConfigs.put(dataSource, config.getDataSource(dataSource)); + } + + // find all properties that start with 'kafka.' and pass them on to Kafka + final Properties kafkaProperties = new Properties(); + for (String propertyName : config.globalConfig().properties().stringPropertyNames()) { + if (propertyName.startsWith("kafka.")) { + kafkaProperties.setProperty( + propertyName.replaceFirst("kafka\\.", ""), + config.globalConfig().properties().getProperty(propertyName) + ); + } + } + + // set the critical Kafka configs again from TranquilityKafkaConfig so it picks up the defaults + kafkaProperties.setProperty("group.id", globalConfig.getKafkaGroupId()); + kafkaProperties.setProperty("zookeeper.connect", globalConfig.getKafkaZookeeperConnect()); + if (kafkaProperties.setProperty( + "zookeeper.session.timeout.ms", + Long.toString(globalConfig.zookeeperTimeout().toStandardDuration().getMillis()) + ) != null) { + throw new IllegalArgumentException( + "Set zookeeper.timeout instead of setting kafka.zookeeper.session.timeout.ms" + ); + } + + Runtime.getRuntime().addShutdownHook( + new Thread( + new Runnable() + { + @Override + public void run() + { + log.info("Initiating shutdown..."); + } + } + ) + ); + } +} diff --git a/kinesis/src/main/java/com/metamx/tranquility/kinesis/model/MessageCounters.java b/kinesis/src/main/java/com/metamx/tranquility/kinesis/model/MessageCounters.java new file mode 100644 index 0000000..af9feef --- /dev/null +++ b/kinesis/src/main/java/com/metamx/tranquility/kinesis/model/MessageCounters.java @@ -0,0 +1,81 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.metamx.tranquility.kafka.model; + +/** + * Used for passing received, sent, and failed message counts from SimpleTranquilizerAdapter. + */ +public class MessageCounters +{ + private final long receivedCount, sentCount, droppedCount, unparseableCount; + + public MessageCounters(long receivedCount, long sentCount, long droppedCount, long unparseableCount) + { + this.receivedCount = receivedCount; + this.sentCount = sentCount; + this.droppedCount = droppedCount; + this.unparseableCount = unparseableCount; + } + + public long getReceivedCount() + { + return this.receivedCount; + } + + public long getSentCount() + { + return this.sentCount; + } + + public long getDroppedCount() + { + return droppedCount; + } + + public long getUnparseableCount() + { + return unparseableCount; + } + + public MessageCounters difference(MessageCounters subtrahend) + { + if (subtrahend == null) { + return this; + } + + return new MessageCounters( + this.receivedCount - subtrahend.getReceivedCount(), + this.sentCount - subtrahend.getSentCount(), + this.droppedCount - subtrahend.getDroppedCount(), + this.unparseableCount - subtrahend.getUnparseableCount() + ); + } + + @Override + public String toString() + { + return "{" + + "receivedCount=" + receivedCount + + ", sentCount=" + sentCount + + ", droppedCount=" + droppedCount + + ", unparseableCount=" + unparseableCount + + '}'; + } +} diff --git a/kinesis/src/main/java/com/metamx/tranquility/kinesis/model/PropertiesBasedKinesisConfig.java b/kinesis/src/main/java/com/metamx/tranquility/kinesis/model/PropertiesBasedKinesisConfig.java new file mode 100644 index 0000000..4eff30d --- /dev/null +++ b/kinesis/src/main/java/com/metamx/tranquility/kinesis/model/PropertiesBasedKinesisConfig.java @@ -0,0 +1,78 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.metamx.tranquility.kinesis.model; + +import com.google.common.collect.ImmutableSet; +import com.metamx.tranquility.config.PropertiesBasedConfig; +import org.skife.config.Config; +import org.skife.config.Default; + +/** + * Configuration object which extends Tranquility configuration with Kafka specific parameters. + */ +public abstract class PropertiesBasedKinesisConfig extends PropertiesBasedConfig +{ + public PropertiesBasedKinesisConfig() + { + super( + ImmutableSet.of( + "kafka.group.id", + "kafka.zookeeper.connect", + "consumer.numThreads", + "commit.periodMillis" + ) + ); + } + + @Config("kafka.group.id") + @Default("tranquility-kafka") + public abstract String getKafkaGroupId(); + + @Config("kafka.zookeeper.connect") + public abstract String getKafkaZookeeperConnect(); + + @Config("consumer.numThreads") + @Default("-1") + public abstract Integer getConsumerNumThreads(); + + @Config("topicPattern") + @Default("(?!)") + public abstract String getTopicPattern(); + + @Config("useTopicAsDataSource") + @Default("false") + public abstract Boolean useTopicAsDataSource(); + + @Config("topicPattern.priority") + @Default("1") + public abstract Integer getTopicPatternPriority(); + + @Config("commit.periodMillis") + @Default("15000") + public abstract Integer getCommitPeriodMillis(); + + @Config("reportDropsAsExceptions") + @Default("false") + public abstract Boolean reportDropsAsExceptions(); + + @Config("reportParseExceptions") + @Default("false") + public abstract Boolean reportParseExceptions(); +} diff --git a/kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/TranquilityEventWriter.java b/kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/TranquilityEventWriter.java new file mode 100644 index 0000000..f3214de --- /dev/null +++ b/kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/TranquilityEventWriter.java @@ -0,0 +1,141 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.metamx.tranquility.kafka.writer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import com.metamx.common.logger.Logger; +import com.metamx.common.parsers.ParseException; +import com.metamx.tranquility.config.DataSourceConfig; +import com.metamx.tranquility.finagle.FinagleRegistry; +import com.metamx.tranquility.kafka.KafkaBeamUtils; +import com.metamx.tranquility.kafka.model.MessageCounters; +import com.metamx.tranquility.kafka.model.PropertiesBasedKinesisConfig; +import com.metamx.tranquility.tranquilizer.MessageDroppedException; +import com.metamx.tranquility.tranquilizer.Tranquilizer; +import com.twitter.util.FutureEventListener; +import org.apache.curator.framework.CuratorFramework; +import scala.runtime.BoxedUnit; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Pushes events to Druid through Tranquility using the SimpleTranquilizerAdapter. + */ +public class TranquilityEventWriter +{ + private static final Logger log = new Logger(TranquilityEventWriter.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final DataSourceConfig dataSourceConfig; + private final Tranquilizer tranquilizer; + + private final AtomicLong receivedCounter = new AtomicLong(); + private final AtomicLong sentCounter = new AtomicLong(); + private final AtomicLong droppedCounter = new AtomicLong(); + private final AtomicLong unparseableCounter = new AtomicLong(); + private final AtomicReference exception = new AtomicReference<>(); + + public TranquilityEventWriter( + String topic, + DataSourceConfig dataSourceConfig, + CuratorFramework curator, + FinagleRegistry finagleRegistry + ) + { + this.dataSourceConfig = dataSourceConfig; + this.tranquilizer = KafkaBeamUtils.createTranquilizer( + topic, + dataSourceConfig, + curator, + finagleRegistry + ); + this.tranquilizer.start(); + } + + public void send(byte[] message) throws InterruptedException + { + receivedCounter.incrementAndGet(); + tranquilizer.send(message).addEventListener( + new FutureEventListener() + { + @Override + public void onSuccess(BoxedUnit value) + { + sentCounter.incrementAndGet(); + } + + @Override + public void onFailure(Throwable cause) + { + if (cause instanceof MessageDroppedException) { + droppedCounter.incrementAndGet(); + if (!dataSourceConfig.propertiesBasedConfig().reportDropsAsExceptions()) { + return; + } + } else if (cause instanceof ParseException) { + unparseableCounter.incrementAndGet(); + if (!dataSourceConfig.propertiesBasedConfig().reportParseExceptions()) { + return; + } + } + + exception.compareAndSet(null, cause); + } + } + ); + + maybeThrow(); + } + + public void flush() throws InterruptedException + { + tranquilizer.flush(); + maybeThrow(); + } + + public void stop() + { + try { + tranquilizer.stop(); + } + catch (IllegalStateException e) { + log.info(e, "Exception while stopping Tranquility"); + } + } + + public MessageCounters getMessageCounters() + { + return new MessageCounters( + receivedCounter.get(), + sentCounter.get(), + droppedCounter.get(), + unparseableCounter.get() + ); + } + + private void maybeThrow() + { + final Throwable e = exception.get(); + if (e != null) { + throw Throwables.propagate(e); + } + } +} diff --git a/kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/WriterController.java b/kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/WriterController.java new file mode 100644 index 0000000..b942912 --- /dev/null +++ b/kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/WriterController.java @@ -0,0 +1,174 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.metamx.tranquility.kafka.writer; + +import com.google.common.primitives.Ints; +import com.metamx.common.logger.Logger; +import com.metamx.common.scala.net.curator.Disco; +import com.metamx.tranquility.config.DataSourceConfig; +import com.metamx.tranquility.finagle.FinagleRegistry; +import com.metamx.tranquility.finagle.FinagleRegistryConfig; +import com.metamx.tranquility.kafka.model.MessageCounters; +import com.metamx.tranquility.kafka.model.PropertiesBasedKinesisConfig; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Pattern; + +/** + * Manages the creation and operation of TranquilityEventWriters. + */ +public class WriterController +{ + private static final Logger log = new Logger(WriterController.class); + private static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 500, 30000); + + private List> dataSourceConfigList; + private Map writers = new ConcurrentHashMap<>(); + private Map curators = new ConcurrentHashMap<>(); + private Map finagleRegistries = new ConcurrentHashMap<>(); + + public WriterController(Map> dataSourceConfigs) + { + this.dataSourceConfigList = new ArrayList<>(dataSourceConfigs.values()); + Collections.sort( + dataSourceConfigList, + new Comparator>() + { + @Override + public int compare( + DataSourceConfig o1, + DataSourceConfig o2 + ) + { + return o2.propertiesBasedConfig() + .getTopicPatternPriority() + .compareTo(o1.propertiesBasedConfig().getTopicPatternPriority()); + } + } + ); + + log.info("Ready: [topicPattern] -> dataSource mappings:"); + for (DataSourceConfig dataSourceConfig : this.dataSourceConfigList) { + log.info( + " [%s] -> %s (priority: %d)", + dataSourceConfig.propertiesBasedConfig().getTopicPattern(), + dataSourceConfig.dataSource(), + dataSourceConfig.propertiesBasedConfig().getTopicPatternPriority() + ); + } + } + + public synchronized TranquilityEventWriter getWriter(String topic) + { + if (!writers.containsKey(topic)) { + // create a EventWriter using the spec mapped to the first matching topicPattern + for (DataSourceConfig dataSourceConfig : dataSourceConfigList) { + if (Pattern.matches(dataSourceConfig.propertiesBasedConfig().getTopicPattern(), topic)) { + log.info( + "Creating EventWriter for topic [%s] using dataSource [%s]", + topic, + dataSourceConfig.dataSource() + ); + writers.put(topic, createWriter(topic, dataSourceConfig)); + return writers.get(topic); + } + } + + throw new RuntimeException(String.format("Kafka topicFilter allowed topic [%s] but no spec is mapped", topic)); + } + + return writers.get(topic); + } + + public Map flushAll() throws InterruptedException + { + Map flushedEvents = new HashMap<>(); + for (Map.Entry entry : writers.entrySet()) { + entry.getValue().flush(); + flushedEvents.put(entry.getKey(), entry.getValue().getMessageCounters()); + } + + return flushedEvents; + } + + public void stop() + { + for (Map.Entry entry : writers.entrySet()) { + entry.getValue().stop(); + } + + for (Map.Entry entry : curators.entrySet()) { + entry.getValue().close(); + } + } + + protected TranquilityEventWriter createWriter( + String topic, + DataSourceConfig dataSourceConfig + ) + { + final String curatorKey = dataSourceConfig.propertiesBasedConfig().zookeeperConnect(); + if (!curators.containsKey(curatorKey)) { + final int zkTimeout = Ints.checkedCast( + dataSourceConfig.propertiesBasedConfig() + .zookeeperTimeout() + .toStandardDuration() + .getMillis() + ); + + final CuratorFramework curator = CuratorFrameworkFactory.builder() + .connectString( + dataSourceConfig.propertiesBasedConfig() + .zookeeperConnect() + ) + .connectionTimeoutMs(zkTimeout) + .retryPolicy(RETRY_POLICY) + .build(); + curator.start(); + curators.put(curatorKey, curator); + } + + final String finagleKey = String.format("%s:%s", curatorKey, dataSourceConfig.propertiesBasedConfig().discoPath()); + if (!finagleRegistries.containsKey(finagleKey)) { + finagleRegistries.put( + finagleKey, new FinagleRegistry( + FinagleRegistryConfig.builder().build(), + new Disco(curators.get(curatorKey), dataSourceConfig.propertiesBasedConfig()) + ) + ); + } + + return new TranquilityEventWriter( + topic, + dataSourceConfig, + curators.get(curatorKey), + finagleRegistries.get(finagleKey) + ); + } +} diff --git a/kinesis/src/main/scala/com/metamx/tranquility/kafka/KafkaBeamUtils.scala b/kinesis/src/main/scala/com/metamx/tranquility/kafka/KafkaBeamUtils.scala new file mode 100644 index 0000000..33e5cdb --- /dev/null +++ b/kinesis/src/main/scala/com/metamx/tranquility/kafka/KafkaBeamUtils.scala @@ -0,0 +1,51 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.metamx.tranquility.kafka + +import com.metamx.tranquility.config.DataSourceConfig +import com.metamx.tranquility.druid.DruidBeams +import com.metamx.tranquility.druid.DruidLocation +import com.metamx.tranquility.finagle.FinagleRegistry +import com.metamx.tranquility.kafka.model.PropertiesBasedKinesisConfig +import com.metamx.tranquility.tranquilizer.Tranquilizer +import org.apache.curator.framework.CuratorFramework +import scala.reflect.runtime.universe.typeTag + +object KafkaBeamUtils +{ + def createTranquilizer( + topic: String, + config: DataSourceConfig[PropertiesBasedKinesisConfig], + curator: CuratorFramework, + finagleRegistry: FinagleRegistry + ): Tranquilizer[Array[Byte]] = + { + DruidBeams.fromConfig(config, typeTag[Array[Byte]]) + .location( + DruidLocation.create( + config.propertiesBasedConfig.druidIndexingServiceName, + if (config.propertiesBasedConfig.useTopicAsDataSource) topic else config.dataSource + ) + ) + .curator(curator) + .finagleRegistry(finagleRegistry) + .buildTranquilizer(config.tranquilizerBuilder()) + } +} diff --git a/kinesis/src/test/java/com/metamx/tranquility/kafka/KafkaConsumerTest.java b/kinesis/src/test/java/com/metamx/tranquility/kafka/KafkaConsumerTest.java new file mode 100644 index 0000000..0c1fce8 --- /dev/null +++ b/kinesis/src/test/java/com/metamx/tranquility/kafka/KafkaConsumerTest.java @@ -0,0 +1,342 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.metamx.tranquility.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import com.metamx.common.scala.Jackson$; +import com.metamx.tranquility.config.DataSourceConfig; +import com.metamx.tranquility.druid.DruidGuicer$; +import com.metamx.tranquility.kafka.model.MessageCounters; +import com.metamx.tranquility.kafka.model.PropertiesBasedKinesisConfig; +import com.metamx.tranquility.kafka.writer.TranquilityEventWriter; +import com.metamx.tranquility.kafka.writer.WriterController; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.firehose.LocalFirehoseFactory; +import io.druid.segment.realtime.plumber.Plumber; +import io.druid.segment.realtime.plumber.PlumberSchool; +import junit.framework.Assert; +import kafka.common.OffsetMetadataAndError; +import kafka.common.TopicAndPartition; +import kafka.javaapi.OffsetFetchRequest; +import kafka.javaapi.OffsetFetchResponse; +import kafka.network.BlockingChannel; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; +import org.apache.commons.io.FileUtils; +import org.apache.curator.test.TestingServer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.skife.config.ConfigurationObjectFactory; + +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; + +public class KafkaConsumerTest +{ + private static final String GROUP_ID = "test"; + private static final String CLIENT_ID = "test"; + private static final String MESSAGE = "message"; + + private static File tempDir; + private static TestingServer zk; + private static KafkaServerStartable kafka; + private static KafkaProducer producer; + private static BlockingChannel channel; + private static Properties consumerProperties; + + @BeforeClass + public static void setUpBeforeClass() throws Exception + { + tempDir = Files.createTempDir(); + zk = new TestingServer(); + kafka = new KafkaServerStartable(getKafkaTestConfig()); + kafka.startup(); + + Properties props = new Properties(); + props.put( + "bootstrap.servers", + String.format("%s:%s", kafka.serverConfig().hostName(), kafka.serverConfig().port()) + ); + producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); + + channel = new BlockingChannel( + kafka.serverConfig().hostName(), + kafka.serverConfig().port(), + BlockingChannel.UseDefaultBufferSize(), + BlockingChannel.UseDefaultBufferSize(), + 5000 + ); + channel.connect(); + + consumerProperties = new Properties(); + consumerProperties.setProperty("group.id", GROUP_ID); + consumerProperties.setProperty("zookeeper.connect", zk.getConnectString()); + consumerProperties.setProperty("kafka.zookeeper.connect", zk.getConnectString()); + consumerProperties.setProperty("commit.periodMillis", "90000"); + consumerProperties.setProperty("auto.offset.reset", "smallest"); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception + { + channel.disconnect(); + producer.close(); + kafka.shutdown(); + zk.close(); + FileUtils.deleteDirectory(tempDir); + } + + private static KafkaConfig getKafkaTestConfig() throws Exception + { + int port; + try (ServerSocket server = new ServerSocket(0)) { + port = server.getLocalPort(); + } + + Properties props = new Properties(); + props.put("broker.id", "0"); + props.put("host.name", "localhost"); + props.put("port", String.valueOf(port)); + props.put("log.dir", tempDir.getPath()); + props.put("zookeeper.connect", zk.getConnectString()); + props.put("replica.socket.timeout.ms", "1500"); + return new KafkaConfig(props); + } + + @Test(timeout = 60_000L) + public void testStartConsumersNoCommit() throws Exception + { + final String topic = "testStartConsumersNoCommit"; + final int numMessages = 5; + final CountDownLatch latch = new CountDownLatch(numMessages); + + consumerProperties.setProperty("topicPattern", topic); + + TranquilityEventWriter mockEventWriter = EasyMock.mock(TranquilityEventWriter.class); + mockEventWriter.send((byte[]) EasyMock.anyObject()); + EasyMock.expectLastCall().andStubAnswer( + new IAnswer() + { + @Override + public Void answer() throws Throwable + { + latch.countDown(); + return null; + } + } + ); + + WriterController mockWriterController = EasyMock.mock(WriterController.class); + EasyMock.expect(mockWriterController.getWriter(topic)).andReturn(mockEventWriter).times(numMessages); + EasyMock.expect(mockWriterController.flushAll()).andReturn(Maps.newHashMap()); + + mockWriterController.stop(); + EasyMock.expectLastCall(); + + EasyMock.replay(mockWriterController, mockEventWriter); + + PropertiesBasedKinesisConfig config = new ConfigurationObjectFactory(consumerProperties).build( + PropertiesBasedKinesisConfig.class); + + FireDepartment fd = new FireDepartment( + new DataSchema(topic, null, new AggregatorFactory[]{}, null, new ObjectMapper()), + new RealtimeIOConfig( + new LocalFirehoseFactory(null, null, null), new PlumberSchool() + { + @Override + public Plumber findPlumber(DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics) + { + return null; + } + }, null + ), + null + ); + + Map> datasourceConfigs = ImmutableMap.of( + topic, + new DataSourceConfig<>( + topic, + config, + fireDepartmentToScalaMap(fd) + ) + ); + + // commitMillis is set high enough that the commit thread should not run during the test + KafkaConsumer kafkaConsumer = new KafkaConsumer( + config, + consumerProperties, + datasourceConfigs, + mockWriterController + ); + kafkaConsumer.start(); + + Assert.assertEquals("Unexpected consumer offset", -1, getConsumerOffset(topic)); + + for (int i = numMessages; i > 0; i--) { + producer.send(new ProducerRecord(topic, MESSAGE)).get(); + } + latch.await(); + + // check that offset wasn't committed since commit thread didn't run + Assert.assertEquals("Unexpected consumer offset", -1, getConsumerOffset(topic)); + + kafkaConsumer.stop(); + + // check that offset was committed on shutdown + Assert.assertEquals("Unexpected consumer offset", numMessages, getConsumerOffset(topic)); + EasyMock.verify(mockWriterController, mockEventWriter); + } + + @Test(timeout = 60_000L) + public void testStartConsumersWithCommitThread() throws Exception + { + final String topic = "testStartConsumersWithCommitThread"; + final int numMessages = 8; + final CountDownLatch latch = new CountDownLatch(numMessages); + + consumerProperties.setProperty("topicPattern", topic); + + TranquilityEventWriter mockEventWriter = EasyMock.mock(TranquilityEventWriter.class); + mockEventWriter.send((byte[]) EasyMock.anyObject()); + EasyMock.expectLastCall().andStubAnswer( + new IAnswer() + { + @Override + public Void answer() throws Throwable + { + latch.countDown(); + return null; + } + } + ); + + WriterController mockWriterController = EasyMock.mock(WriterController.class); + EasyMock.expect(mockWriterController.getWriter(topic)).andReturn(mockEventWriter).times(numMessages); + EasyMock.expect(mockWriterController.flushAll()).andReturn(Maps.newHashMap()).times(2); + + mockWriterController.stop(); + EasyMock.expectLastCall(); + + EasyMock.replay(mockWriterController, mockEventWriter); + + PropertiesBasedKinesisConfig config = new ConfigurationObjectFactory(consumerProperties).build( + PropertiesBasedKinesisConfig.class); + + FireDepartment fd = new FireDepartment( + new DataSchema(topic, null, new AggregatorFactory[]{}, null, new ObjectMapper()), + new RealtimeIOConfig( + new LocalFirehoseFactory(null, null, null), new PlumberSchool() + { + @Override + public Plumber findPlumber(DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics) + { + return null; + } + }, null + ), + null + ); + + Map> datasourceConfigs = ImmutableMap.of( + topic, + new DataSourceConfig<>( + topic, + config, + fireDepartmentToScalaMap(fd) + ) + ); + + // commitMillis is set high enough that the commit thread should not run during the test + KafkaConsumer kafkaConsumer = new KafkaConsumer( + config, + consumerProperties, + datasourceConfigs, + mockWriterController + ); + kafkaConsumer.start(); + + Assert.assertEquals("Unexpected consumer offset", -1, getConsumerOffset(topic)); + + for (int i = numMessages; i > 0; i--) { + producer.send(new ProducerRecord(topic, MESSAGE)).get(); + } + latch.await(); + + kafkaConsumer.commit(); + + // check that offset was committed since commit ran + Assert.assertEquals("Unexpected consumer offset", numMessages, getConsumerOffset(topic)); + + kafkaConsumer.stop(); + + Assert.assertEquals("Unexpected consumer offset", numMessages, getConsumerOffset(topic)); + EasyMock.verify(mockWriterController, mockEventWriter); + } + + private long getConsumerOffset(String topic) + { + TopicAndPartition partition = new TopicAndPartition(topic, 0); + OffsetFetchRequest fetchRequest = new OffsetFetchRequest( + GROUP_ID, Lists.newArrayList(partition), + (short) 0, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper + 0, CLIENT_ID + ); + + if (!channel.isConnected()) { + channel.connect(); + } + + channel.send(fetchRequest.underlying()); + + OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer()); + OffsetMetadataAndError result = fetchResponse.offsets().get(partition); + return result.offset(); + } + + public static scala.collection.immutable.Map fireDepartmentToScalaMap( + final FireDepartment fireDepartment + ) throws IOException + { + return Jackson$.MODULE$.newObjectMapper().readValue( + DruidGuicer$.MODULE$.Default().objectMapper().writeValueAsBytes(fireDepartment), + scala.collection.immutable.Map.class + ); + } +} diff --git a/kinesis/src/test/java/com/metamx/tranquility/kafka/writer/WriterControllerTest.java b/kinesis/src/test/java/com/metamx/tranquility/kafka/writer/WriterControllerTest.java new file mode 100644 index 0000000..e866c72 --- /dev/null +++ b/kinesis/src/test/java/com/metamx/tranquility/kafka/writer/WriterControllerTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.metamx.tranquility.kafka.writer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import com.metamx.tranquility.config.DataSourceConfig; +import com.metamx.tranquility.kafka.KafkaConsumerTest; +import com.metamx.tranquility.kafka.model.MessageCounters; +import com.metamx.tranquility.kafka.model.PropertiesBasedKinesisConfig; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.firehose.LocalFirehoseFactory; +import io.druid.segment.realtime.plumber.Plumber; +import io.druid.segment.realtime.plumber.PlumberSchool; +import junit.framework.Assert; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; +import org.skife.config.ConfigurationObjectFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +public class WriterControllerTest +{ + class TestableWriterController extends WriterController + { + + private Map mockWriters = new HashMap<>(); + + public TestableWriterController(Map> dataSourceConfigs) + { + super(dataSourceConfigs); + } + + protected TranquilityEventWriter createWriter( + String topic, + DataSourceConfig dataSourceConfig + ) + { + TranquilityEventWriter writer = EasyMock.mock(TranquilityEventWriter.class); + mockWriters.put(topic, writer); + return writer; + } + } + + private TestableWriterController writerController; + + @Before + public void setUp() throws IOException + { + Properties props = new Properties(); + props.setProperty("kafka.zookeeper.connect", "localhost:2181"); + props.setProperty("zookeeper.connect", "localhost:2181"); + + Properties propsTwitter = new Properties(props); + propsTwitter.setProperty("topicPattern", "twitter"); + + Properties propsTest = new Properties(props); + propsTest.setProperty("topicPattern", "test[0-9]"); + propsTest.setProperty("useTopicAsDataSource", "true"); + + FireDepartment fdTwitter = new FireDepartment( + new DataSchema("twitter", null, new AggregatorFactory[]{}, null, new ObjectMapper()), + new RealtimeIOConfig( + new LocalFirehoseFactory(null, null, null), new PlumberSchool() + { + @Override + public Plumber findPlumber(DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics) + { + return null; + } + }, null + ), + null + ); + + FireDepartment fdTest = new FireDepartment( + new DataSchema("test[0-9]", null, new AggregatorFactory[]{}, null, new ObjectMapper()), + new RealtimeIOConfig( + new LocalFirehoseFactory(null, null, null), new PlumberSchool() + { + @Override + public Plumber findPlumber(DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics) + { + return null; + } + }, null + ), + null + ); + + Map> datasourceConfigs = ImmutableMap.of( + "twitter", + new DataSourceConfig<>( + "twitter", + new ConfigurationObjectFactory(propsTwitter).build(PropertiesBasedKinesisConfig.class), + KafkaConsumerTest.fireDepartmentToScalaMap(fdTwitter) + ), + "test[0-9]", + new DataSourceConfig<>( + "test[0-9]", + new ConfigurationObjectFactory(propsTest).build(PropertiesBasedKinesisConfig.class), + KafkaConsumerTest.fireDepartmentToScalaMap(fdTest) + ) + ); + + writerController = new TestableWriterController(datasourceConfigs); + } + + @Test + public void testGetWriter() + { + Assert.assertEquals(0, writerController.mockWriters.size()); + + TranquilityEventWriter writerTwitter = writerController.getWriter("twitter"); + + Assert.assertEquals(1, writerController.mockWriters.size()); + Assert.assertSame(writerTwitter, writerController.mockWriters.get("twitter")); + Assert.assertSame(writerTwitter, writerController.getWriter("twitter")); + + TranquilityEventWriter writerTest0 = writerController.getWriter("test0"); + TranquilityEventWriter writerTest1 = writerController.getWriter("test1"); + + Assert.assertEquals(3, writerController.mockWriters.size()); + Assert.assertSame(writerTest0, writerController.getWriter("test0")); + Assert.assertSame(writerTest1, writerController.getWriter("test1")); + } + + @Test(expected = RuntimeException.class) + public void testGetWriterInvalid() + { + writerController.getWriter("test10"); + } + + @Test + public void testFlushAll() throws InterruptedException + { + writerController.getWriter("test0"); + writerController.getWriter("test1"); + TranquilityEventWriter mock0 = writerController.mockWriters.get("test0"); + TranquilityEventWriter mock1 = writerController.mockWriters.get("test1"); + + mock0.flush(); + mock1.flush(); + EasyMock.expect(mock0.getMessageCounters()).andReturn(new MessageCounters(1, 2, 3, 0)); + EasyMock.expect(mock1.getMessageCounters()).andReturn(new MessageCounters(4, 5, 6, 0)); + EasyMock.replay(mock0, mock1); + + Map results = writerController.flushAll(); + + EasyMock.verify(mock0, mock1); + + Assert.assertEquals(2, results.size()); + + Assert.assertEquals(1, results.get("test0").getReceivedCount()); + Assert.assertEquals(2, results.get("test0").getSentCount()); + Assert.assertEquals(3, results.get("test0").getDroppedCount()); + Assert.assertEquals(0, results.get("test0").getUnparseableCount()); + + Assert.assertEquals(4, results.get("test1").getReceivedCount()); + Assert.assertEquals(5, results.get("test1").getSentCount()); + Assert.assertEquals(6, results.get("test1").getDroppedCount()); + Assert.assertEquals(0, results.get("test1").getUnparseableCount()); + } + + @Test + public void testStop() + { + writerController.getWriter("test0"); + writerController.getWriter("test1"); + TranquilityEventWriter mock0 = writerController.mockWriters.get("test0"); + TranquilityEventWriter mock1 = writerController.mockWriters.get("test1"); + + mock0.stop(); + mock1.stop(); + EasyMock.replay(mock0, mock1); + + writerController.stop(); + + EasyMock.verify(mock0, mock1); + } +} From 966696998ee25a38b91e586d1b7b199d2d248c7f Mon Sep 17 00:00:00 2001 From: Brian O'Neill Date: Tue, 10 Oct 2017 22:06:38 -0400 Subject: [PATCH 2/4] more cleanup --- .../tranquility/kinesis/model/MessageCounters.java | 2 +- .../kinesis/writer/TranquilityEventWriter.java | 11 ++++++----- .../tranquility/kinesis/writer/WriterController.java | 5 +++-- .../KinesisBeamUtils.scala} | 6 +++--- 4 files changed, 13 insertions(+), 11 deletions(-) rename kinesis/src/main/scala/com/metamx/tranquility/{kafka/KafkaBeamUtils.scala => kinesis/KinesisBeamUtils.scala} (92%) diff --git a/kinesis/src/main/java/com/metamx/tranquility/kinesis/model/MessageCounters.java b/kinesis/src/main/java/com/metamx/tranquility/kinesis/model/MessageCounters.java index af9feef..d80ef31 100644 --- a/kinesis/src/main/java/com/metamx/tranquility/kinesis/model/MessageCounters.java +++ b/kinesis/src/main/java/com/metamx/tranquility/kinesis/model/MessageCounters.java @@ -17,7 +17,7 @@ * under the License. */ -package com.metamx.tranquility.kafka.model; +package com.metamx.tranquility.kinesis.model; /** * Used for passing received, sent, and failed message counts from SimpleTranquilizerAdapter. diff --git a/kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/TranquilityEventWriter.java b/kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/TranquilityEventWriter.java index f3214de..9ca8f9b 100644 --- a/kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/TranquilityEventWriter.java +++ b/kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/TranquilityEventWriter.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package com.metamx.tranquility.kafka.writer; +package com.metamx.tranquility.kinesis.writer; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; @@ -24,9 +24,10 @@ import com.metamx.common.parsers.ParseException; import com.metamx.tranquility.config.DataSourceConfig; import com.metamx.tranquility.finagle.FinagleRegistry; -import com.metamx.tranquility.kafka.KafkaBeamUtils; -import com.metamx.tranquility.kafka.model.MessageCounters; -import com.metamx.tranquility.kafka.model.PropertiesBasedKinesisConfig; +import com.metamx.tranquility.kinesis.KinesisBeamUtils; +import com.metamx.tranquility.kinesis.model.MessageCounters; +import com.metamx.tranquility.kinesis.model.PropertiesBasedKinesisConfig; +import com.metamx.tranquility.kinesis.model.PropertiesBasedKinesisConfig; import com.metamx.tranquility.tranquilizer.MessageDroppedException; import com.metamx.tranquility.tranquilizer.Tranquilizer; import com.twitter.util.FutureEventListener; @@ -61,7 +62,7 @@ public TranquilityEventWriter( ) { this.dataSourceConfig = dataSourceConfig; - this.tranquilizer = KafkaBeamUtils.createTranquilizer( + this.tranquilizer = KinesisBeamUtils.createTranquilizer( topic, dataSourceConfig, curator, diff --git a/kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/WriterController.java b/kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/WriterController.java index b942912..98b7e88 100644 --- a/kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/WriterController.java +++ b/kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/WriterController.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package com.metamx.tranquility.kafka.writer; +package com.metamx.tranquility.kinesis.writer; import com.google.common.primitives.Ints; import com.metamx.common.logger.Logger; @@ -25,7 +25,8 @@ import com.metamx.tranquility.finagle.FinagleRegistry; import com.metamx.tranquility.finagle.FinagleRegistryConfig; import com.metamx.tranquility.kafka.model.MessageCounters; -import com.metamx.tranquility.kafka.model.PropertiesBasedKinesisConfig; +import com.metamx.tranquility.kinesis.model.PropertiesBasedKinesisConfig; +import com.metamx.tranquility.kinesis.writer.TranquilityEventWriter; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; diff --git a/kinesis/src/main/scala/com/metamx/tranquility/kafka/KafkaBeamUtils.scala b/kinesis/src/main/scala/com/metamx/tranquility/kinesis/KinesisBeamUtils.scala similarity index 92% rename from kinesis/src/main/scala/com/metamx/tranquility/kafka/KafkaBeamUtils.scala rename to kinesis/src/main/scala/com/metamx/tranquility/kinesis/KinesisBeamUtils.scala index 33e5cdb..6b7612a 100644 --- a/kinesis/src/main/scala/com/metamx/tranquility/kafka/KafkaBeamUtils.scala +++ b/kinesis/src/main/scala/com/metamx/tranquility/kinesis/KinesisBeamUtils.scala @@ -17,18 +17,18 @@ * under the License. */ -package com.metamx.tranquility.kafka +package com.metamx.tranquility.kinesis import com.metamx.tranquility.config.DataSourceConfig import com.metamx.tranquility.druid.DruidBeams import com.metamx.tranquility.druid.DruidLocation import com.metamx.tranquility.finagle.FinagleRegistry -import com.metamx.tranquility.kafka.model.PropertiesBasedKinesisConfig +import com.metamx.tranquility.kinesis.model.PropertiesBasedKinesisConfig import com.metamx.tranquility.tranquilizer.Tranquilizer import org.apache.curator.framework.CuratorFramework import scala.reflect.runtime.universe.typeTag -object KafkaBeamUtils +object KinesisBeamUtils { def createTranquilizer( topic: String, From ca0eefe25596a7b2ca6e479e7b1af7565a8944dc Mon Sep 17 00:00:00 2001 From: Brian O'Neill Date: Tue, 10 Oct 2017 22:33:50 -0400 Subject: [PATCH 3/4] config files for testing --- .../tranquility/kinesis/KinesisConsumer.java | 6 +- .../kinesis/KinesisEventConsumer.java | 64 ++++ .../tranquility/kafka/KafkaConsumerTest.java | 342 ------------------ .../kafka/writer/WriterControllerTest.java | 207 ----------- .../kinesis/KinesisConsumerTest.java | 44 +++ kinesis/src/test/resources/kinesis.json | 167 +++++++++ kinesis/src/test/resources/kinesis.yaml | 83 +++++ 7 files changed, 359 insertions(+), 554 deletions(-) create mode 100644 kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisEventConsumer.java delete mode 100644 kinesis/src/test/java/com/metamx/tranquility/kafka/KafkaConsumerTest.java delete mode 100644 kinesis/src/test/java/com/metamx/tranquility/kafka/writer/WriterControllerTest.java create mode 100644 kinesis/src/test/java/com/metamx/tranquility/kinesis/KinesisConsumerTest.java create mode 100644 kinesis/src/test/resources/kinesis.json create mode 100644 kinesis/src/test/resources/kinesis.yaml diff --git a/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisConsumer.java b/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisConsumer.java index 6212ec0..a7fd2f0 100644 --- a/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisConsumer.java +++ b/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisConsumer.java @@ -35,11 +35,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; /** - * Spawns a number of threads to read messages from Kafka topics and write them by calling - * WriterController.getWriter(topic).send(). Will periodically call WriterController.flushAll() and when this completes - * will call ConsumerConnector.commitOffsets() to save the last written offset to ZooKeeper. This implementation - * guarantees that any events in Kafka will be read at least once even in case of a failure condition but does not - * guarantee that duplication will not occur. + * TODO: Port this to Kinesis */ public class KinesisConsumer { diff --git a/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisEventConsumer.java b/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisEventConsumer.java new file mode 100644 index 0000000..7e5eb95 --- /dev/null +++ b/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisEventConsumer.java @@ -0,0 +1,64 @@ +package com.metamx.tranquility.kinesis; + +import java.util.List; + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; + +public class KinesisEventConsumer implements IRecordProcessorFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(KinesisEventConsumer.class); + private Worker.Builder builder; + + public KinesisEventConsumer(String propertiesFile, String streamName, String appName, String initialPosition) { + KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile(propertiesFile); + + InitialPositionInStream position = InitialPositionInStream.valueOf(initialPosition); + + KinesisClientLibConfiguration clientConfig = new KinesisClientLibConfiguration(appName, streamName, + new DefaultAWSCredentialsProviderChain(), appName) + .withRegionName(config.getRegion()) + .withInitialPositionInStream(position); + + this.builder = new Worker.Builder().recordProcessorFactory(this).config(clientConfig); + } + + public void start(){ + this.builder.build().run(); + } + + @Override + public IRecordProcessor createProcessor() { + LOGGER.debug("Creating recordProcessor."); + return new IRecordProcessor() { + @Override + public void initialize(String shardId) {} + + @Override + public void processRecords(List records, IRecordProcessorCheckpointer checkpointer) { + for (Record record : records){ + byte[] bytes = new byte[record.getData().remaining()]; + record.getData().get(bytes); + String data = new String(bytes); + LOGGER.debug("Received [{}]", data); + } + } + + @Override + public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { + LOGGER.debug("Shutting down [{}]"); + } + }; + + } +} diff --git a/kinesis/src/test/java/com/metamx/tranquility/kafka/KafkaConsumerTest.java b/kinesis/src/test/java/com/metamx/tranquility/kafka/KafkaConsumerTest.java deleted file mode 100644 index 0c1fce8..0000000 --- a/kinesis/src/test/java/com/metamx/tranquility/kafka/KafkaConsumerTest.java +++ /dev/null @@ -1,342 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.metamx.tranquility.kafka; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.io.Files; -import com.metamx.common.scala.Jackson$; -import com.metamx.tranquility.config.DataSourceConfig; -import com.metamx.tranquility.druid.DruidGuicer$; -import com.metamx.tranquility.kafka.model.MessageCounters; -import com.metamx.tranquility.kafka.model.PropertiesBasedKinesisConfig; -import com.metamx.tranquility.kafka.writer.TranquilityEventWriter; -import com.metamx.tranquility.kafka.writer.WriterController; -import io.druid.query.aggregation.AggregatorFactory; -import io.druid.segment.indexing.DataSchema; -import io.druid.segment.indexing.RealtimeIOConfig; -import io.druid.segment.indexing.RealtimeTuningConfig; -import io.druid.segment.realtime.FireDepartment; -import io.druid.segment.realtime.FireDepartmentMetrics; -import io.druid.segment.realtime.firehose.LocalFirehoseFactory; -import io.druid.segment.realtime.plumber.Plumber; -import io.druid.segment.realtime.plumber.PlumberSchool; -import junit.framework.Assert; -import kafka.common.OffsetMetadataAndError; -import kafka.common.TopicAndPartition; -import kafka.javaapi.OffsetFetchRequest; -import kafka.javaapi.OffsetFetchResponse; -import kafka.network.BlockingChannel; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServerStartable; -import org.apache.commons.io.FileUtils; -import org.apache.curator.test.TestingServer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.skife.config.ConfigurationObjectFactory; - -import java.io.File; -import java.io.IOException; -import java.net.ServerSocket; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.CountDownLatch; - -public class KafkaConsumerTest -{ - private static final String GROUP_ID = "test"; - private static final String CLIENT_ID = "test"; - private static final String MESSAGE = "message"; - - private static File tempDir; - private static TestingServer zk; - private static KafkaServerStartable kafka; - private static KafkaProducer producer; - private static BlockingChannel channel; - private static Properties consumerProperties; - - @BeforeClass - public static void setUpBeforeClass() throws Exception - { - tempDir = Files.createTempDir(); - zk = new TestingServer(); - kafka = new KafkaServerStartable(getKafkaTestConfig()); - kafka.startup(); - - Properties props = new Properties(); - props.put( - "bootstrap.servers", - String.format("%s:%s", kafka.serverConfig().hostName(), kafka.serverConfig().port()) - ); - producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); - - channel = new BlockingChannel( - kafka.serverConfig().hostName(), - kafka.serverConfig().port(), - BlockingChannel.UseDefaultBufferSize(), - BlockingChannel.UseDefaultBufferSize(), - 5000 - ); - channel.connect(); - - consumerProperties = new Properties(); - consumerProperties.setProperty("group.id", GROUP_ID); - consumerProperties.setProperty("zookeeper.connect", zk.getConnectString()); - consumerProperties.setProperty("kafka.zookeeper.connect", zk.getConnectString()); - consumerProperties.setProperty("commit.periodMillis", "90000"); - consumerProperties.setProperty("auto.offset.reset", "smallest"); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception - { - channel.disconnect(); - producer.close(); - kafka.shutdown(); - zk.close(); - FileUtils.deleteDirectory(tempDir); - } - - private static KafkaConfig getKafkaTestConfig() throws Exception - { - int port; - try (ServerSocket server = new ServerSocket(0)) { - port = server.getLocalPort(); - } - - Properties props = new Properties(); - props.put("broker.id", "0"); - props.put("host.name", "localhost"); - props.put("port", String.valueOf(port)); - props.put("log.dir", tempDir.getPath()); - props.put("zookeeper.connect", zk.getConnectString()); - props.put("replica.socket.timeout.ms", "1500"); - return new KafkaConfig(props); - } - - @Test(timeout = 60_000L) - public void testStartConsumersNoCommit() throws Exception - { - final String topic = "testStartConsumersNoCommit"; - final int numMessages = 5; - final CountDownLatch latch = new CountDownLatch(numMessages); - - consumerProperties.setProperty("topicPattern", topic); - - TranquilityEventWriter mockEventWriter = EasyMock.mock(TranquilityEventWriter.class); - mockEventWriter.send((byte[]) EasyMock.anyObject()); - EasyMock.expectLastCall().andStubAnswer( - new IAnswer() - { - @Override - public Void answer() throws Throwable - { - latch.countDown(); - return null; - } - } - ); - - WriterController mockWriterController = EasyMock.mock(WriterController.class); - EasyMock.expect(mockWriterController.getWriter(topic)).andReturn(mockEventWriter).times(numMessages); - EasyMock.expect(mockWriterController.flushAll()).andReturn(Maps.newHashMap()); - - mockWriterController.stop(); - EasyMock.expectLastCall(); - - EasyMock.replay(mockWriterController, mockEventWriter); - - PropertiesBasedKinesisConfig config = new ConfigurationObjectFactory(consumerProperties).build( - PropertiesBasedKinesisConfig.class); - - FireDepartment fd = new FireDepartment( - new DataSchema(topic, null, new AggregatorFactory[]{}, null, new ObjectMapper()), - new RealtimeIOConfig( - new LocalFirehoseFactory(null, null, null), new PlumberSchool() - { - @Override - public Plumber findPlumber(DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics) - { - return null; - } - }, null - ), - null - ); - - Map> datasourceConfigs = ImmutableMap.of( - topic, - new DataSourceConfig<>( - topic, - config, - fireDepartmentToScalaMap(fd) - ) - ); - - // commitMillis is set high enough that the commit thread should not run during the test - KafkaConsumer kafkaConsumer = new KafkaConsumer( - config, - consumerProperties, - datasourceConfigs, - mockWriterController - ); - kafkaConsumer.start(); - - Assert.assertEquals("Unexpected consumer offset", -1, getConsumerOffset(topic)); - - for (int i = numMessages; i > 0; i--) { - producer.send(new ProducerRecord(topic, MESSAGE)).get(); - } - latch.await(); - - // check that offset wasn't committed since commit thread didn't run - Assert.assertEquals("Unexpected consumer offset", -1, getConsumerOffset(topic)); - - kafkaConsumer.stop(); - - // check that offset was committed on shutdown - Assert.assertEquals("Unexpected consumer offset", numMessages, getConsumerOffset(topic)); - EasyMock.verify(mockWriterController, mockEventWriter); - } - - @Test(timeout = 60_000L) - public void testStartConsumersWithCommitThread() throws Exception - { - final String topic = "testStartConsumersWithCommitThread"; - final int numMessages = 8; - final CountDownLatch latch = new CountDownLatch(numMessages); - - consumerProperties.setProperty("topicPattern", topic); - - TranquilityEventWriter mockEventWriter = EasyMock.mock(TranquilityEventWriter.class); - mockEventWriter.send((byte[]) EasyMock.anyObject()); - EasyMock.expectLastCall().andStubAnswer( - new IAnswer() - { - @Override - public Void answer() throws Throwable - { - latch.countDown(); - return null; - } - } - ); - - WriterController mockWriterController = EasyMock.mock(WriterController.class); - EasyMock.expect(mockWriterController.getWriter(topic)).andReturn(mockEventWriter).times(numMessages); - EasyMock.expect(mockWriterController.flushAll()).andReturn(Maps.newHashMap()).times(2); - - mockWriterController.stop(); - EasyMock.expectLastCall(); - - EasyMock.replay(mockWriterController, mockEventWriter); - - PropertiesBasedKinesisConfig config = new ConfigurationObjectFactory(consumerProperties).build( - PropertiesBasedKinesisConfig.class); - - FireDepartment fd = new FireDepartment( - new DataSchema(topic, null, new AggregatorFactory[]{}, null, new ObjectMapper()), - new RealtimeIOConfig( - new LocalFirehoseFactory(null, null, null), new PlumberSchool() - { - @Override - public Plumber findPlumber(DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics) - { - return null; - } - }, null - ), - null - ); - - Map> datasourceConfigs = ImmutableMap.of( - topic, - new DataSourceConfig<>( - topic, - config, - fireDepartmentToScalaMap(fd) - ) - ); - - // commitMillis is set high enough that the commit thread should not run during the test - KafkaConsumer kafkaConsumer = new KafkaConsumer( - config, - consumerProperties, - datasourceConfigs, - mockWriterController - ); - kafkaConsumer.start(); - - Assert.assertEquals("Unexpected consumer offset", -1, getConsumerOffset(topic)); - - for (int i = numMessages; i > 0; i--) { - producer.send(new ProducerRecord(topic, MESSAGE)).get(); - } - latch.await(); - - kafkaConsumer.commit(); - - // check that offset was committed since commit ran - Assert.assertEquals("Unexpected consumer offset", numMessages, getConsumerOffset(topic)); - - kafkaConsumer.stop(); - - Assert.assertEquals("Unexpected consumer offset", numMessages, getConsumerOffset(topic)); - EasyMock.verify(mockWriterController, mockEventWriter); - } - - private long getConsumerOffset(String topic) - { - TopicAndPartition partition = new TopicAndPartition(topic, 0); - OffsetFetchRequest fetchRequest = new OffsetFetchRequest( - GROUP_ID, Lists.newArrayList(partition), - (short) 0, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper - 0, CLIENT_ID - ); - - if (!channel.isConnected()) { - channel.connect(); - } - - channel.send(fetchRequest.underlying()); - - OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer()); - OffsetMetadataAndError result = fetchResponse.offsets().get(partition); - return result.offset(); - } - - public static scala.collection.immutable.Map fireDepartmentToScalaMap( - final FireDepartment fireDepartment - ) throws IOException - { - return Jackson$.MODULE$.newObjectMapper().readValue( - DruidGuicer$.MODULE$.Default().objectMapper().writeValueAsBytes(fireDepartment), - scala.collection.immutable.Map.class - ); - } -} diff --git a/kinesis/src/test/java/com/metamx/tranquility/kafka/writer/WriterControllerTest.java b/kinesis/src/test/java/com/metamx/tranquility/kafka/writer/WriterControllerTest.java deleted file mode 100644 index e866c72..0000000 --- a/kinesis/src/test/java/com/metamx/tranquility/kafka/writer/WriterControllerTest.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package com.metamx.tranquility.kafka.writer; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableMap; -import com.metamx.tranquility.config.DataSourceConfig; -import com.metamx.tranquility.kafka.KafkaConsumerTest; -import com.metamx.tranquility.kafka.model.MessageCounters; -import com.metamx.tranquility.kafka.model.PropertiesBasedKinesisConfig; -import io.druid.query.aggregation.AggregatorFactory; -import io.druid.segment.indexing.DataSchema; -import io.druid.segment.indexing.RealtimeIOConfig; -import io.druid.segment.indexing.RealtimeTuningConfig; -import io.druid.segment.realtime.FireDepartment; -import io.druid.segment.realtime.FireDepartmentMetrics; -import io.druid.segment.realtime.firehose.LocalFirehoseFactory; -import io.druid.segment.realtime.plumber.Plumber; -import io.druid.segment.realtime.plumber.PlumberSchool; -import junit.framework.Assert; -import org.easymock.EasyMock; -import org.junit.Before; -import org.junit.Test; -import org.skife.config.ConfigurationObjectFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -public class WriterControllerTest -{ - class TestableWriterController extends WriterController - { - - private Map mockWriters = new HashMap<>(); - - public TestableWriterController(Map> dataSourceConfigs) - { - super(dataSourceConfigs); - } - - protected TranquilityEventWriter createWriter( - String topic, - DataSourceConfig dataSourceConfig - ) - { - TranquilityEventWriter writer = EasyMock.mock(TranquilityEventWriter.class); - mockWriters.put(topic, writer); - return writer; - } - } - - private TestableWriterController writerController; - - @Before - public void setUp() throws IOException - { - Properties props = new Properties(); - props.setProperty("kafka.zookeeper.connect", "localhost:2181"); - props.setProperty("zookeeper.connect", "localhost:2181"); - - Properties propsTwitter = new Properties(props); - propsTwitter.setProperty("topicPattern", "twitter"); - - Properties propsTest = new Properties(props); - propsTest.setProperty("topicPattern", "test[0-9]"); - propsTest.setProperty("useTopicAsDataSource", "true"); - - FireDepartment fdTwitter = new FireDepartment( - new DataSchema("twitter", null, new AggregatorFactory[]{}, null, new ObjectMapper()), - new RealtimeIOConfig( - new LocalFirehoseFactory(null, null, null), new PlumberSchool() - { - @Override - public Plumber findPlumber(DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics) - { - return null; - } - }, null - ), - null - ); - - FireDepartment fdTest = new FireDepartment( - new DataSchema("test[0-9]", null, new AggregatorFactory[]{}, null, new ObjectMapper()), - new RealtimeIOConfig( - new LocalFirehoseFactory(null, null, null), new PlumberSchool() - { - @Override - public Plumber findPlumber(DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics) - { - return null; - } - }, null - ), - null - ); - - Map> datasourceConfigs = ImmutableMap.of( - "twitter", - new DataSourceConfig<>( - "twitter", - new ConfigurationObjectFactory(propsTwitter).build(PropertiesBasedKinesisConfig.class), - KafkaConsumerTest.fireDepartmentToScalaMap(fdTwitter) - ), - "test[0-9]", - new DataSourceConfig<>( - "test[0-9]", - new ConfigurationObjectFactory(propsTest).build(PropertiesBasedKinesisConfig.class), - KafkaConsumerTest.fireDepartmentToScalaMap(fdTest) - ) - ); - - writerController = new TestableWriterController(datasourceConfigs); - } - - @Test - public void testGetWriter() - { - Assert.assertEquals(0, writerController.mockWriters.size()); - - TranquilityEventWriter writerTwitter = writerController.getWriter("twitter"); - - Assert.assertEquals(1, writerController.mockWriters.size()); - Assert.assertSame(writerTwitter, writerController.mockWriters.get("twitter")); - Assert.assertSame(writerTwitter, writerController.getWriter("twitter")); - - TranquilityEventWriter writerTest0 = writerController.getWriter("test0"); - TranquilityEventWriter writerTest1 = writerController.getWriter("test1"); - - Assert.assertEquals(3, writerController.mockWriters.size()); - Assert.assertSame(writerTest0, writerController.getWriter("test0")); - Assert.assertSame(writerTest1, writerController.getWriter("test1")); - } - - @Test(expected = RuntimeException.class) - public void testGetWriterInvalid() - { - writerController.getWriter("test10"); - } - - @Test - public void testFlushAll() throws InterruptedException - { - writerController.getWriter("test0"); - writerController.getWriter("test1"); - TranquilityEventWriter mock0 = writerController.mockWriters.get("test0"); - TranquilityEventWriter mock1 = writerController.mockWriters.get("test1"); - - mock0.flush(); - mock1.flush(); - EasyMock.expect(mock0.getMessageCounters()).andReturn(new MessageCounters(1, 2, 3, 0)); - EasyMock.expect(mock1.getMessageCounters()).andReturn(new MessageCounters(4, 5, 6, 0)); - EasyMock.replay(mock0, mock1); - - Map results = writerController.flushAll(); - - EasyMock.verify(mock0, mock1); - - Assert.assertEquals(2, results.size()); - - Assert.assertEquals(1, results.get("test0").getReceivedCount()); - Assert.assertEquals(2, results.get("test0").getSentCount()); - Assert.assertEquals(3, results.get("test0").getDroppedCount()); - Assert.assertEquals(0, results.get("test0").getUnparseableCount()); - - Assert.assertEquals(4, results.get("test1").getReceivedCount()); - Assert.assertEquals(5, results.get("test1").getSentCount()); - Assert.assertEquals(6, results.get("test1").getDroppedCount()); - Assert.assertEquals(0, results.get("test1").getUnparseableCount()); - } - - @Test - public void testStop() - { - writerController.getWriter("test0"); - writerController.getWriter("test1"); - TranquilityEventWriter mock0 = writerController.mockWriters.get("test0"); - TranquilityEventWriter mock1 = writerController.mockWriters.get("test1"); - - mock0.stop(); - mock1.stop(); - EasyMock.replay(mock0, mock1); - - writerController.stop(); - - EasyMock.verify(mock0, mock1); - } -} diff --git a/kinesis/src/test/java/com/metamx/tranquility/kinesis/KinesisConsumerTest.java b/kinesis/src/test/java/com/metamx/tranquility/kinesis/KinesisConsumerTest.java new file mode 100644 index 0000000..ee02118 --- /dev/null +++ b/kinesis/src/test/java/com/metamx/tranquility/kinesis/KinesisConsumerTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.metamx.tranquility.kinesis; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class KinesisConsumerTest +{ + + @BeforeClass + public static void setUpBeforeClass() throws Exception + { + } + + @AfterClass + public static void tearDownAfterClass() throws Exception + { + } + + @Test(timeout = 60_000L) + public void testConsumer() throws Exception + { + + } +} diff --git a/kinesis/src/test/resources/kinesis.json b/kinesis/src/test/resources/kinesis.json new file mode 100644 index 0000000..ab8fcc0 --- /dev/null +++ b/kinesis/src/test/resources/kinesis.json @@ -0,0 +1,167 @@ +{ + "dataSources" : [ + { + "spec" : { + "dataSchema" : { + "parser" : { + "type" : "string", + "parseSpec" : { + "timestampSpec" : { + "format" : "auto", + "column" : "timestamp" + }, + "dimensionsSpec" : { + "spatialDimensions" : [ + { + "dims" : [ + "lat", + "lon" + ], + "dimName" : "geo" + } + ], + "dimensions" : [ + "text", + "hashtags", + "lat", + "lon", + "source", + "retweet", + "lang", + "utc_offset", + "screen_name", + "verified" + ] + }, + "format" : "json" + } + }, + "dataSource" : "twitter", + "granularitySpec" : { + "segmentGranularity" : "hour", + "type" : "uniform", + "queryGranularity" : "none" + }, + "metricsSpec" : [ + { + "type" : "count", + "name" : "tweets" + }, + { + "fieldName" : "followers", + "type" : "longSum", + "name" : "followers" + }, + { + "name" : "retweets", + "type" : "longSum", + "fieldName" : "retweets" + }, + { + "fieldName" : "friends", + "type" : "longSum", + "name" : "friends" + }, + { + "name" : "statuses", + "type" : "longSum", + "fieldName" : "statuses" + } + ] + }, + "tuningConfig" : { + "maxRowsInMemory" : "100000", + "type" : "realtime", + "windowPeriod" : "PT10M", + "intermediatePersistPeriod" : "PT10M" + } + }, + "properties" : { + "topicPattern.priority" : "1", + "topicPattern" : "twitter" + } + }, + { + "spec" : { + "dataSchema" : { + "granularitySpec" : { + "queryGranularity" : "none", + "type" : "uniform", + "segmentGranularity" : "hour" + }, + "dataSource" : "wikipedia", + "parser" : { + "type" : "string", + "parseSpec" : { + "timestampSpec" : { + "format" : "auto", + "column" : "timestamp" + }, + "format" : "json", + "dimensionsSpec" : { + "dimensions" : [ + "page", + "language", + "user", + "unpatrolled", + "newPage", + "robot", + "anonymous", + "namespace", + "continent", + "country", + "region", + "city" + ] + } + } + }, + "metricsSpec" : [ + { + "type" : "count", + "name" : "count" + }, + { + "type" : "doubleSum", + "name" : "added", + "fieldName" : "added" + }, + { + "name" : "deleted", + "type" : "doubleSum", + "fieldName" : "deleted" + }, + { + "name" : "delta", + "type" : "doubleSum", + "fieldName" : "delta" + } + ] + }, + "tuningConfig" : { + "type" : "realtime", + "intermediatePersistPeriod" : "PT10M", + "windowPeriod" : "PT10M", + "maxRowsInMemory" : 75000 + } + }, + "properties" : { + "task.partitions" : "2", + "task.replicants" : "2", + "topicPattern" : "wikipedia.*", + "topicPattern.priority" : "1" + } + } + ], + "properties" : { + "zookeeper.connect" : "localhost:2181", + "zookeeper.timeout" : "PT20S", + "druid.selectors.indexing.serviceName" : "druid/overlord", + "druid.discovery.curator.path" : "/druid/discovery", + "kinesis.zookeeper.connect" : "localhost:2181", + "kafka.group.id" : "tranquility-kafka", + "consumer.numThreads" : "2", + "commit.periodMillis" : "15000", + "reportDropsAsExceptions" : "false" + } +} diff --git a/kinesis/src/test/resources/kinesis.yaml b/kinesis/src/test/resources/kinesis.yaml new file mode 100644 index 0000000..17e5ea9 --- /dev/null +++ b/kinesis/src/test/resources/kinesis.yaml @@ -0,0 +1,83 @@ +dataSources: + - spec: + dataSchema: + dataSource: twitter + parser: + type: string + parseSpec: + format: json + timestampSpec: + column: timestamp + format: auto + dimensionsSpec: + dimensions: [text, hashtags, lat, lon, source, retweet, lang, utc_offset, screen_name, verified] + spatialDimensions: [ { dimName: geo, dims: [lat, lon]} ] + + metricsSpec: + - { type: count, name: tweets } + - { type: longSum, name: followers, fieldName: followers } + - { type: longSum, name: retweets, fieldName: retweets } + - { type: longSum, name: friends, fieldName: friends } + - { type: longSum, name: statuses, fieldName: statuses } + + granularitySpec: + type: uniform + segmentGranularity: hour + queryGranularity: none + + tuningConfig: + type: realtime + maxRowsInMemory: 100000 + intermediatePersistPeriod: PT10M + windowPeriod: PT10M + + properties: + topicPattern: twitter + topicPattern.priority: 1 + + - spec: + dataSchema: + dataSource: wikipedia + parser: + type: string + parseSpec: + format: json + timestampSpec: + column: timestamp + format: auto + dimensionsSpec: + dimensions: [page, language, user, unpatrolled, newPage, robot, anonymous, namespace, continent, country, region, city] + + metricsSpec: + - { type: count, name: count } + - { type: doubleSum, name: added, fieldName: added } + - { type: doubleSum, name: deleted, fieldName: deleted } + - { type: doubleSum, name: delta, fieldName: delta } + + granularitySpec: + type: uniform + segmentGranularity: hour + queryGranularity: none + + tuningConfig: + type: realtime + windowPeriod: PT10M + intermediatePersistPeriod: PT10M + maxRowsInMemory: 75000 + + properties: + task.partitions: 2 + task.replicants: 2 + topicPattern: wikipedia.* + topicPattern.priority: 1 + +properties: + zookeeper.connect: localhost:2181 + zookeeper.timeout: PT20S + druid.discovery.curator.path: /druid/discovery + druid.selectors.indexing.serviceName: druid/overlord + kafka.zookeeper.connect: localhost:2181 + kafka.group.id: tranquility-kafka + consumer.numThreads: 2 + commit.periodMillis: 15000 + reportDropsAsExceptions: false From 1b282db7f72184dd50c807f84b79c7c4580e8929 Mon Sep 17 00:00:00 2001 From: Brian O'Neill Date: Wed, 11 Oct 2017 09:08:25 -0400 Subject: [PATCH 4/4] mo' kinesis --- build.sbt | 4 +- .../kinesis/KinesisEventConsumer.java | 6 +-- .../model/PropertiesBasedKinesisConfig.java | 44 +++---------------- .../kinesis/KinesisBeamUtils.scala | 2 +- .../kinesis/KinesisConfigTest.java | 43 ++++++++++++++++++ kinesis/src/test/resources/kinesis.json | 3 +- kinesis/src/test/resources/kinesis.yaml | 3 +- 7 files changed, 56 insertions(+), 49 deletions(-) create mode 100644 kinesis/src/test/java/com/metamx/tranquility/kinesis/KinesisConfigTest.java diff --git a/build.sbt b/build.sbt index b336413..a924ca2 100644 --- a/build.sbt +++ b/build.sbt @@ -25,6 +25,7 @@ val jettyVersion = "9.2.5.v20141112" val apacheHttpVersion = "4.3.3" val kafkaVersion = "0.10.1.1" val airlineVersion = "0.7" +val amazonKinesisClientVersion = "1.8.1" def dependOnDruid(artifact: String) = { ("io.druid" % artifact % druidVersion @@ -141,7 +142,8 @@ val kinesisDependencies = Seq( exclude("org.slf4j", "slf4j-log4j12") exclude("log4j", "log4j") force(), - "io.airlift" % "airline" % airlineVersion + "io.airlift" % "airline" % airlineVersion, + "com.amazonaws" % "amazon-kinesis-client" % amazonKinesisClientVersion ) ++ loggingDependencies val coreTestDependencies = Seq( diff --git a/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisEventConsumer.java b/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisEventConsumer.java index 7e5eb95..2b83102 100644 --- a/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisEventConsumer.java +++ b/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisEventConsumer.java @@ -14,20 +14,18 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; public class KinesisEventConsumer implements IRecordProcessorFactory { private static final Logger LOGGER = LoggerFactory.getLogger(KinesisEventConsumer.class); private Worker.Builder builder; - public KinesisEventConsumer(String propertiesFile, String streamName, String appName, String initialPosition) { - KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile(propertiesFile); + public KinesisEventConsumer(String region, String streamName, String appName, String initialPosition) { InitialPositionInStream position = InitialPositionInStream.valueOf(initialPosition); KinesisClientLibConfiguration clientConfig = new KinesisClientLibConfiguration(appName, streamName, new DefaultAWSCredentialsProviderChain(), appName) - .withRegionName(config.getRegion()) + .withRegionName(region) .withInitialPositionInStream(position); this.builder = new Worker.Builder().recordProcessorFactory(this).config(clientConfig); diff --git a/kinesis/src/main/java/com/metamx/tranquility/kinesis/model/PropertiesBasedKinesisConfig.java b/kinesis/src/main/java/com/metamx/tranquility/kinesis/model/PropertiesBasedKinesisConfig.java index 4eff30d..c843bae 100644 --- a/kinesis/src/main/java/com/metamx/tranquility/kinesis/model/PropertiesBasedKinesisConfig.java +++ b/kinesis/src/main/java/com/metamx/tranquility/kinesis/model/PropertiesBasedKinesisConfig.java @@ -25,7 +25,7 @@ import org.skife.config.Default; /** - * Configuration object which extends Tranquility configuration with Kafka specific parameters. + * Configuration object which extends Tranquility configuration with Kinesis specific parameters. */ public abstract class PropertiesBasedKinesisConfig extends PropertiesBasedConfig { @@ -33,46 +33,12 @@ public PropertiesBasedKinesisConfig() { super( ImmutableSet.of( - "kafka.group.id", - "kafka.zookeeper.connect", - "consumer.numThreads", - "commit.periodMillis" + "kinesis.aws.region" ) ); } - @Config("kafka.group.id") - @Default("tranquility-kafka") - public abstract String getKafkaGroupId(); - - @Config("kafka.zookeeper.connect") - public abstract String getKafkaZookeeperConnect(); - - @Config("consumer.numThreads") - @Default("-1") - public abstract Integer getConsumerNumThreads(); - - @Config("topicPattern") - @Default("(?!)") - public abstract String getTopicPattern(); - - @Config("useTopicAsDataSource") - @Default("false") - public abstract Boolean useTopicAsDataSource(); - - @Config("topicPattern.priority") - @Default("1") - public abstract Integer getTopicPatternPriority(); - - @Config("commit.periodMillis") - @Default("15000") - public abstract Integer getCommitPeriodMillis(); - - @Config("reportDropsAsExceptions") - @Default("false") - public abstract Boolean reportDropsAsExceptions(); - - @Config("reportParseExceptions") - @Default("false") - public abstract Boolean reportParseExceptions(); + @Config("kinesis.aws.region") + @Default("tranquility-kinesis") + public abstract String getKinesisAwsRegion(); } diff --git a/kinesis/src/main/scala/com/metamx/tranquility/kinesis/KinesisBeamUtils.scala b/kinesis/src/main/scala/com/metamx/tranquility/kinesis/KinesisBeamUtils.scala index 6b7612a..5172790 100644 --- a/kinesis/src/main/scala/com/metamx/tranquility/kinesis/KinesisBeamUtils.scala +++ b/kinesis/src/main/scala/com/metamx/tranquility/kinesis/KinesisBeamUtils.scala @@ -41,7 +41,7 @@ object KinesisBeamUtils .location( DruidLocation.create( config.propertiesBasedConfig.druidIndexingServiceName, - if (config.propertiesBasedConfig.useTopicAsDataSource) topic else config.dataSource + config.dataSource ) ) .curator(curator) diff --git a/kinesis/src/test/java/com/metamx/tranquility/kinesis/KinesisConfigTest.java b/kinesis/src/test/java/com/metamx/tranquility/kinesis/KinesisConfigTest.java new file mode 100644 index 0000000..d0d4ad4 --- /dev/null +++ b/kinesis/src/test/java/com/metamx/tranquility/kinesis/KinesisConfigTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.metamx.tranquility.kinesis; + +import com.metamx.tranquility.kinesis.model.PropertiesBasedKinesisConfig; +import junit.framework.TestCase; +import org.junit.Test; +import org.skife.config.ConfigurationObjectFactory; + +import java.util.Properties; + + +public class KinesisConfigTest extends TestCase +{ + @Test + public void testConfig() throws Exception + { + Properties consumerProperties = new Properties(); + consumerProperties.setProperty("kinesis.aws.region", "us-east-1"); + + PropertiesBasedKinesisConfig config = new ConfigurationObjectFactory(consumerProperties).build( + PropertiesBasedKinesisConfig.class); + + } + +} diff --git a/kinesis/src/test/resources/kinesis.json b/kinesis/src/test/resources/kinesis.json index ab8fcc0..0e9808d 100644 --- a/kinesis/src/test/resources/kinesis.json +++ b/kinesis/src/test/resources/kinesis.json @@ -158,8 +158,7 @@ "zookeeper.timeout" : "PT20S", "druid.selectors.indexing.serviceName" : "druid/overlord", "druid.discovery.curator.path" : "/druid/discovery", - "kinesis.zookeeper.connect" : "localhost:2181", - "kafka.group.id" : "tranquility-kafka", + "kinesis.aws.region" : "us-east-1", "consumer.numThreads" : "2", "commit.periodMillis" : "15000", "reportDropsAsExceptions" : "false" diff --git a/kinesis/src/test/resources/kinesis.yaml b/kinesis/src/test/resources/kinesis.yaml index 17e5ea9..054bfb9 100644 --- a/kinesis/src/test/resources/kinesis.yaml +++ b/kinesis/src/test/resources/kinesis.yaml @@ -76,8 +76,7 @@ properties: zookeeper.timeout: PT20S druid.discovery.curator.path: /druid/discovery druid.selectors.indexing.serviceName: druid/overlord - kafka.zookeeper.connect: localhost:2181 - kafka.group.id: tranquility-kafka + kinesis.aws.region : us-east-1 consumer.numThreads: 2 commit.periodMillis: 15000 reportDropsAsExceptions: false