diff --git a/build.sbt b/build.sbt index 768c3ed..bf77042 100644 --- a/build.sbt +++ b/build.sbt @@ -25,6 +25,7 @@ val jettyVersion = "9.2.5.v20141112" val apacheHttpVersion = "4.3.3" val kafkaVersion = "0.10.1.1" val airlineVersion = "0.7" +val amazonKinesisClientVersion = "1.8.1" def dependOnDruid(artifact: String) = { ("io.druid" % artifact % druidVersion @@ -136,6 +137,15 @@ val kafkaDependencies = Seq( "io.airlift" % "airline" % airlineVersion ) ++ loggingDependencies +val kinesisDependencies = Seq( + "org.apache.kafka" %% "kafka" % kafkaVersion + exclude("org.slf4j", "slf4j-log4j12") + exclude("log4j", "log4j") + force(), + "io.airlift" % "airline" % airlineVersion, + "com.amazonaws" % "amazon-kinesis-client" % amazonKinesisClientVersion +) ++ loggingDependencies + val coreTestDependencies = Seq( "org.scalatest" %% "scalatest" % "2.2.5" % "test", dependOnDruid("druid-services") % "test", @@ -175,6 +185,10 @@ val kafkaTestDependencies = Seq( "org.easymock" % "easymock" % "3.4" % "test" ) +val kinesisTestDependencies = Seq( + "org.easymock" % "easymock" % "3.4" % "test" +) + lazy val commonSettings = Seq( organization := "io.druid", @@ -266,6 +280,13 @@ lazy val kafka = project.in(file("kafka")) .settings(publishArtifact in Test := false) .dependsOn(core % "test->test;compile->compile") +lazy val kinesis = project.in(file("kinesis")) + .settings(commonSettings: _*) + .settings(name := "tranquility-kinesis") + .settings(libraryDependencies ++= (kinesisDependencies ++ kinesisTestDependencies)) + .settings(publishArtifact in Test := false) + .dependsOn(core % "test->test;compile->compile") + lazy val distribution = project.in(file("distribution")) .settings(commonSettings: _*) .settings(name := "tranquility-distribution") @@ -274,4 +295,4 @@ lazy val distribution = project.in(file("distribution")) .settings(executableScriptName := "tranquility") .settings(bashScriptExtraDefines += """addJava "-Dlogback.configurationFile=${app_home}/../conf/logback.xml"""") .enablePlugins(JavaAppPackaging) - .dependsOn(kafka, server) + .dependsOn(kafka, kinesis, server) diff --git a/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisConsumer.java b/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisConsumer.java new file mode 100644 index 0000000..a7fd2f0 --- /dev/null +++ b/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisConsumer.java @@ -0,0 +1,185 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.metamx.tranquility.kinesis; + +import com.google.common.base.Throwables; +import com.metamx.common.logger.Logger; +import com.metamx.tranquility.config.DataSourceConfig; +import com.metamx.tranquility.kafka.model.MessageCounters; +import com.metamx.tranquility.kinesis.model.PropertiesBasedKinesisConfig; +import com.metamx.tranquility.kafka.writer.WriterController; +import io.druid.concurrent.Execs; + + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * TODO: Port this to Kinesis + */ +public class KinesisConsumer +{ + private static final Logger log = new Logger(KinesisConsumer.class); + + private final ExecutorService consumerExec; + private final Thread commitThread; + private final AtomicBoolean shutdown = new AtomicBoolean(); + + // prevents reading the next event from Kafka while events are being flushed and offset is being committed to ZK + private final ReentrantReadWriteLock commitLock = new ReentrantReadWriteLock(); + + private final int numThreads; + private final int commitMillis; + private final WriterController writerController; + + private Map previousMessageCounters = new HashMap<>(); + + public KinesisConsumer( + final PropertiesBasedKinesisConfig globalConfig, + final Properties kinesisProperties, + final Map> dataSourceConfigs, + final WriterController writerController + ) + { + int defaultNumThreads = Math.max(1, Runtime.getRuntime().availableProcessors() - 1); + this.numThreads = globalConfig.getConsumerNumThreads() > 0 + ? globalConfig.getConsumerNumThreads() + : defaultNumThreads; + + this.commitMillis = globalConfig.getCommitPeriodMillis(); + this.writerController = writerController; + this.consumerExec = Execs.multiThreaded(numThreads, "KafkaConsumer-%d"); + this.commitThread = new Thread(createCommitRunnable()); + this.commitThread.setName("KafkaConsumer-CommitThread"); + this.commitThread.setDaemon(true); + } + + public void start() + { + commitThread.start(); + // startConsumers(); + } + + public void stop() + { + if (shutdown.compareAndSet(false, true)) { + log.info("Shutting down - attempting to flush buffers and commit final offsets"); + + try { + commitLock.writeLock().lockInterruptibly(); // prevent Kafka from consuming any more events + try { + writerController.flushAll(); // try to flush the remaining events to Druid + writerController.stop(); + } + finally { + commitLock.writeLock().unlock(); + commitThread.interrupt(); + consumerExec.shutdownNow(); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + Throwables.propagate(e); + } + + log.info("Finished clean shutdown."); + } + } + + public void join() throws InterruptedException + { + commitThread.join(); + } + + void commit() throws InterruptedException + { + commitLock.writeLock().lockInterruptibly(); + try { + final long flushStartTime = System.currentTimeMillis(); + final Map messageCounters = writerController.flushAll(); // blocks until complete + + final long commitStartTime = System.currentTimeMillis(); + + final long finishedTime = System.currentTimeMillis(); + Map countsSinceLastCommit = new HashMap(); + for (Map.Entry entry : messageCounters.entrySet()) { + countsSinceLastCommit.put( + entry.getKey(), + entry.getValue().difference(previousMessageCounters.get(entry.getKey())) + ); + } + + previousMessageCounters = messageCounters; + + log.info( + "Flushed %s pending messages in %sms and committed offsets in %sms.", + countsSinceLastCommit.isEmpty() ? "0" : countsSinceLastCommit, + commitStartTime - flushStartTime, + finishedTime - commitStartTime + ); + } + finally { + commitLock.writeLock().unlock(); + } + } + + private Runnable createCommitRunnable() + { + return new Runnable() + { + @Override + public void run() + { + long lastFlushTime = System.currentTimeMillis(); + try { + while (!Thread.currentThread().isInterrupted()) { + Thread.sleep(Math.max(commitMillis - (System.currentTimeMillis() - lastFlushTime), 0)); + commit(); + lastFlushTime = System.currentTimeMillis(); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.info("Commit thread interrupted."); + } + catch (Throwable e) { + log.error(e, "Commit thread failed!"); + throw Throwables.propagate(e); + } + finally { + stop(); + } + } + }; + } + + private static String buildTopicFilter(Map> dataSourceConfigs) + { + StringBuilder topicFilter = new StringBuilder(); + for (Map.Entry> entry : dataSourceConfigs.entrySet()) { + topicFilter.append(String.format("(%s)|", entry.getValue().propertiesBasedConfig().getTopicPattern())); + } + + return topicFilter.length() > 0 ? topicFilter.substring(0, topicFilter.length() - 1) : ""; + } +} diff --git a/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisEventConsumer.java b/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisEventConsumer.java new file mode 100644 index 0000000..2b83102 --- /dev/null +++ b/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisEventConsumer.java @@ -0,0 +1,62 @@ +package com.metamx.tranquility.kinesis; + +import java.util.List; + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; +import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; +import com.amazonaws.services.kinesis.model.Record; + +public class KinesisEventConsumer implements IRecordProcessorFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(KinesisEventConsumer.class); + private Worker.Builder builder; + + public KinesisEventConsumer(String region, String streamName, String appName, String initialPosition) { + + InitialPositionInStream position = InitialPositionInStream.valueOf(initialPosition); + + KinesisClientLibConfiguration clientConfig = new KinesisClientLibConfiguration(appName, streamName, + new DefaultAWSCredentialsProviderChain(), appName) + .withRegionName(region) + .withInitialPositionInStream(position); + + this.builder = new Worker.Builder().recordProcessorFactory(this).config(clientConfig); + } + + public void start(){ + this.builder.build().run(); + } + + @Override + public IRecordProcessor createProcessor() { + LOGGER.debug("Creating recordProcessor."); + return new IRecordProcessor() { + @Override + public void initialize(String shardId) {} + + @Override + public void processRecords(List records, IRecordProcessorCheckpointer checkpointer) { + for (Record record : records){ + byte[] bytes = new byte[record.getData().remaining()]; + record.getData().get(bytes); + String data = new String(bytes); + LOGGER.debug("Received [{}]", data); + } + } + + @Override + public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { + LOGGER.debug("Shutting down [{}]"); + } + }; + + } +} diff --git a/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisMain.java b/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisMain.java new file mode 100644 index 0000000..a36c819 --- /dev/null +++ b/kinesis/src/main/java/com/metamx/tranquility/kinesis/KinesisMain.java @@ -0,0 +1,111 @@ +package com.metamx.tranquility.kinesis; + +import com.google.common.base.Throwables; +import com.google.common.collect.Maps; +import com.metamx.common.logger.Logger; +import com.metamx.tranquility.config.DataSourceConfig; +import com.metamx.tranquility.config.TranquilityConfig; +import com.metamx.tranquility.kinesis.model.PropertiesBasedKinesisConfig; +import io.airlift.airline.Help; +import io.airlift.airline.HelpOption; +import io.airlift.airline.Option; +import io.airlift.airline.SingleCommand; +import javax.inject.Inject; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.Properties; + +public class KinesisMain +{ + + private static final Logger log = new Logger(KinesisMain.class); + + @Inject + public HelpOption helpOption; + + @Option(name = {"-f", "-configFile"}, description = "Path to configuration property file") + public String propertiesFile; + + public static void main(String[] args) throws Exception + { + KinesisMain main; + try { + main = SingleCommand.singleCommand(KinesisMain.class).parse(args); + } + catch (Exception e) { + log.error(e, "Exception parsing arguments"); + Help.help(SingleCommand.singleCommand(KinesisMain.class).getCommandMetadata()); + return; + } + + if (main.helpOption.showHelpIfRequested()) { + return; + } + + main.run(); + } + + public void run() throws InterruptedException + { + if (propertiesFile == null || propertiesFile.isEmpty()) { + helpOption.help = true; + helpOption.showHelpIfRequested(); + + log.warn("Missing required parameters, aborting."); + return; + } + + TranquilityConfig config = null; + try (InputStream in = new FileInputStream(propertiesFile)) { + config = TranquilityConfig.read(in, PropertiesBasedKinesisConfig.class); + } + catch (IOException e) { + log.error("Could not read config file: %s, aborting.", propertiesFile); + Throwables.propagate(e); + } + + PropertiesBasedKinesisConfig globalConfig = config.globalConfig(); + Map> dataSourceConfigs = Maps.newHashMap(); + for (String dataSource : config.getDataSources()) { + dataSourceConfigs.put(dataSource, config.getDataSource(dataSource)); + } + + // find all properties that start with 'kafka.' and pass them on to Kafka + final Properties kafkaProperties = new Properties(); + for (String propertyName : config.globalConfig().properties().stringPropertyNames()) { + if (propertyName.startsWith("kafka.")) { + kafkaProperties.setProperty( + propertyName.replaceFirst("kafka\\.", ""), + config.globalConfig().properties().getProperty(propertyName) + ); + } + } + + // set the critical Kafka configs again from TranquilityKafkaConfig so it picks up the defaults + kafkaProperties.setProperty("group.id", globalConfig.getKafkaGroupId()); + kafkaProperties.setProperty("zookeeper.connect", globalConfig.getKafkaZookeeperConnect()); + if (kafkaProperties.setProperty( + "zookeeper.session.timeout.ms", + Long.toString(globalConfig.zookeeperTimeout().toStandardDuration().getMillis()) + ) != null) { + throw new IllegalArgumentException( + "Set zookeeper.timeout instead of setting kafka.zookeeper.session.timeout.ms" + ); + } + + Runtime.getRuntime().addShutdownHook( + new Thread( + new Runnable() + { + @Override + public void run() + { + log.info("Initiating shutdown..."); + } + } + ) + ); + } +} diff --git a/kinesis/src/main/java/com/metamx/tranquility/kinesis/model/MessageCounters.java b/kinesis/src/main/java/com/metamx/tranquility/kinesis/model/MessageCounters.java new file mode 100644 index 0000000..d80ef31 --- /dev/null +++ b/kinesis/src/main/java/com/metamx/tranquility/kinesis/model/MessageCounters.java @@ -0,0 +1,81 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.metamx.tranquility.kinesis.model; + +/** + * Used for passing received, sent, and failed message counts from SimpleTranquilizerAdapter. + */ +public class MessageCounters +{ + private final long receivedCount, sentCount, droppedCount, unparseableCount; + + public MessageCounters(long receivedCount, long sentCount, long droppedCount, long unparseableCount) + { + this.receivedCount = receivedCount; + this.sentCount = sentCount; + this.droppedCount = droppedCount; + this.unparseableCount = unparseableCount; + } + + public long getReceivedCount() + { + return this.receivedCount; + } + + public long getSentCount() + { + return this.sentCount; + } + + public long getDroppedCount() + { + return droppedCount; + } + + public long getUnparseableCount() + { + return unparseableCount; + } + + public MessageCounters difference(MessageCounters subtrahend) + { + if (subtrahend == null) { + return this; + } + + return new MessageCounters( + this.receivedCount - subtrahend.getReceivedCount(), + this.sentCount - subtrahend.getSentCount(), + this.droppedCount - subtrahend.getDroppedCount(), + this.unparseableCount - subtrahend.getUnparseableCount() + ); + } + + @Override + public String toString() + { + return "{" + + "receivedCount=" + receivedCount + + ", sentCount=" + sentCount + + ", droppedCount=" + droppedCount + + ", unparseableCount=" + unparseableCount + + '}'; + } +} diff --git a/kinesis/src/main/java/com/metamx/tranquility/kinesis/model/PropertiesBasedKinesisConfig.java b/kinesis/src/main/java/com/metamx/tranquility/kinesis/model/PropertiesBasedKinesisConfig.java new file mode 100644 index 0000000..c843bae --- /dev/null +++ b/kinesis/src/main/java/com/metamx/tranquility/kinesis/model/PropertiesBasedKinesisConfig.java @@ -0,0 +1,44 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.metamx.tranquility.kinesis.model; + +import com.google.common.collect.ImmutableSet; +import com.metamx.tranquility.config.PropertiesBasedConfig; +import org.skife.config.Config; +import org.skife.config.Default; + +/** + * Configuration object which extends Tranquility configuration with Kinesis specific parameters. + */ +public abstract class PropertiesBasedKinesisConfig extends PropertiesBasedConfig +{ + public PropertiesBasedKinesisConfig() + { + super( + ImmutableSet.of( + "kinesis.aws.region" + ) + ); + } + + @Config("kinesis.aws.region") + @Default("tranquility-kinesis") + public abstract String getKinesisAwsRegion(); +} diff --git a/kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/TranquilityEventWriter.java b/kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/TranquilityEventWriter.java new file mode 100644 index 0000000..9ca8f9b --- /dev/null +++ b/kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/TranquilityEventWriter.java @@ -0,0 +1,142 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.metamx.tranquility.kinesis.writer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import com.metamx.common.logger.Logger; +import com.metamx.common.parsers.ParseException; +import com.metamx.tranquility.config.DataSourceConfig; +import com.metamx.tranquility.finagle.FinagleRegistry; +import com.metamx.tranquility.kinesis.KinesisBeamUtils; +import com.metamx.tranquility.kinesis.model.MessageCounters; +import com.metamx.tranquility.kinesis.model.PropertiesBasedKinesisConfig; +import com.metamx.tranquility.kinesis.model.PropertiesBasedKinesisConfig; +import com.metamx.tranquility.tranquilizer.MessageDroppedException; +import com.metamx.tranquility.tranquilizer.Tranquilizer; +import com.twitter.util.FutureEventListener; +import org.apache.curator.framework.CuratorFramework; +import scala.runtime.BoxedUnit; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Pushes events to Druid through Tranquility using the SimpleTranquilizerAdapter. + */ +public class TranquilityEventWriter +{ + private static final Logger log = new Logger(TranquilityEventWriter.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final DataSourceConfig dataSourceConfig; + private final Tranquilizer tranquilizer; + + private final AtomicLong receivedCounter = new AtomicLong(); + private final AtomicLong sentCounter = new AtomicLong(); + private final AtomicLong droppedCounter = new AtomicLong(); + private final AtomicLong unparseableCounter = new AtomicLong(); + private final AtomicReference exception = new AtomicReference<>(); + + public TranquilityEventWriter( + String topic, + DataSourceConfig dataSourceConfig, + CuratorFramework curator, + FinagleRegistry finagleRegistry + ) + { + this.dataSourceConfig = dataSourceConfig; + this.tranquilizer = KinesisBeamUtils.createTranquilizer( + topic, + dataSourceConfig, + curator, + finagleRegistry + ); + this.tranquilizer.start(); + } + + public void send(byte[] message) throws InterruptedException + { + receivedCounter.incrementAndGet(); + tranquilizer.send(message).addEventListener( + new FutureEventListener() + { + @Override + public void onSuccess(BoxedUnit value) + { + sentCounter.incrementAndGet(); + } + + @Override + public void onFailure(Throwable cause) + { + if (cause instanceof MessageDroppedException) { + droppedCounter.incrementAndGet(); + if (!dataSourceConfig.propertiesBasedConfig().reportDropsAsExceptions()) { + return; + } + } else if (cause instanceof ParseException) { + unparseableCounter.incrementAndGet(); + if (!dataSourceConfig.propertiesBasedConfig().reportParseExceptions()) { + return; + } + } + + exception.compareAndSet(null, cause); + } + } + ); + + maybeThrow(); + } + + public void flush() throws InterruptedException + { + tranquilizer.flush(); + maybeThrow(); + } + + public void stop() + { + try { + tranquilizer.stop(); + } + catch (IllegalStateException e) { + log.info(e, "Exception while stopping Tranquility"); + } + } + + public MessageCounters getMessageCounters() + { + return new MessageCounters( + receivedCounter.get(), + sentCounter.get(), + droppedCounter.get(), + unparseableCounter.get() + ); + } + + private void maybeThrow() + { + final Throwable e = exception.get(); + if (e != null) { + throw Throwables.propagate(e); + } + } +} diff --git a/kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/WriterController.java b/kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/WriterController.java new file mode 100644 index 0000000..98b7e88 --- /dev/null +++ b/kinesis/src/main/java/com/metamx/tranquility/kinesis/writer/WriterController.java @@ -0,0 +1,175 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.metamx.tranquility.kinesis.writer; + +import com.google.common.primitives.Ints; +import com.metamx.common.logger.Logger; +import com.metamx.common.scala.net.curator.Disco; +import com.metamx.tranquility.config.DataSourceConfig; +import com.metamx.tranquility.finagle.FinagleRegistry; +import com.metamx.tranquility.finagle.FinagleRegistryConfig; +import com.metamx.tranquility.kafka.model.MessageCounters; +import com.metamx.tranquility.kinesis.model.PropertiesBasedKinesisConfig; +import com.metamx.tranquility.kinesis.writer.TranquilityEventWriter; +import org.apache.curator.RetryPolicy; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Pattern; + +/** + * Manages the creation and operation of TranquilityEventWriters. + */ +public class WriterController +{ + private static final Logger log = new Logger(WriterController.class); + private static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 500, 30000); + + private List> dataSourceConfigList; + private Map writers = new ConcurrentHashMap<>(); + private Map curators = new ConcurrentHashMap<>(); + private Map finagleRegistries = new ConcurrentHashMap<>(); + + public WriterController(Map> dataSourceConfigs) + { + this.dataSourceConfigList = new ArrayList<>(dataSourceConfigs.values()); + Collections.sort( + dataSourceConfigList, + new Comparator>() + { + @Override + public int compare( + DataSourceConfig o1, + DataSourceConfig o2 + ) + { + return o2.propertiesBasedConfig() + .getTopicPatternPriority() + .compareTo(o1.propertiesBasedConfig().getTopicPatternPriority()); + } + } + ); + + log.info("Ready: [topicPattern] -> dataSource mappings:"); + for (DataSourceConfig dataSourceConfig : this.dataSourceConfigList) { + log.info( + " [%s] -> %s (priority: %d)", + dataSourceConfig.propertiesBasedConfig().getTopicPattern(), + dataSourceConfig.dataSource(), + dataSourceConfig.propertiesBasedConfig().getTopicPatternPriority() + ); + } + } + + public synchronized TranquilityEventWriter getWriter(String topic) + { + if (!writers.containsKey(topic)) { + // create a EventWriter using the spec mapped to the first matching topicPattern + for (DataSourceConfig dataSourceConfig : dataSourceConfigList) { + if (Pattern.matches(dataSourceConfig.propertiesBasedConfig().getTopicPattern(), topic)) { + log.info( + "Creating EventWriter for topic [%s] using dataSource [%s]", + topic, + dataSourceConfig.dataSource() + ); + writers.put(topic, createWriter(topic, dataSourceConfig)); + return writers.get(topic); + } + } + + throw new RuntimeException(String.format("Kafka topicFilter allowed topic [%s] but no spec is mapped", topic)); + } + + return writers.get(topic); + } + + public Map flushAll() throws InterruptedException + { + Map flushedEvents = new HashMap<>(); + for (Map.Entry entry : writers.entrySet()) { + entry.getValue().flush(); + flushedEvents.put(entry.getKey(), entry.getValue().getMessageCounters()); + } + + return flushedEvents; + } + + public void stop() + { + for (Map.Entry entry : writers.entrySet()) { + entry.getValue().stop(); + } + + for (Map.Entry entry : curators.entrySet()) { + entry.getValue().close(); + } + } + + protected TranquilityEventWriter createWriter( + String topic, + DataSourceConfig dataSourceConfig + ) + { + final String curatorKey = dataSourceConfig.propertiesBasedConfig().zookeeperConnect(); + if (!curators.containsKey(curatorKey)) { + final int zkTimeout = Ints.checkedCast( + dataSourceConfig.propertiesBasedConfig() + .zookeeperTimeout() + .toStandardDuration() + .getMillis() + ); + + final CuratorFramework curator = CuratorFrameworkFactory.builder() + .connectString( + dataSourceConfig.propertiesBasedConfig() + .zookeeperConnect() + ) + .connectionTimeoutMs(zkTimeout) + .retryPolicy(RETRY_POLICY) + .build(); + curator.start(); + curators.put(curatorKey, curator); + } + + final String finagleKey = String.format("%s:%s", curatorKey, dataSourceConfig.propertiesBasedConfig().discoPath()); + if (!finagleRegistries.containsKey(finagleKey)) { + finagleRegistries.put( + finagleKey, new FinagleRegistry( + FinagleRegistryConfig.builder().build(), + new Disco(curators.get(curatorKey), dataSourceConfig.propertiesBasedConfig()) + ) + ); + } + + return new TranquilityEventWriter( + topic, + dataSourceConfig, + curators.get(curatorKey), + finagleRegistries.get(finagleKey) + ); + } +} diff --git a/kinesis/src/main/scala/com/metamx/tranquility/kinesis/KinesisBeamUtils.scala b/kinesis/src/main/scala/com/metamx/tranquility/kinesis/KinesisBeamUtils.scala new file mode 100644 index 0000000..5172790 --- /dev/null +++ b/kinesis/src/main/scala/com/metamx/tranquility/kinesis/KinesisBeamUtils.scala @@ -0,0 +1,51 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.metamx.tranquility.kinesis + +import com.metamx.tranquility.config.DataSourceConfig +import com.metamx.tranquility.druid.DruidBeams +import com.metamx.tranquility.druid.DruidLocation +import com.metamx.tranquility.finagle.FinagleRegistry +import com.metamx.tranquility.kinesis.model.PropertiesBasedKinesisConfig +import com.metamx.tranquility.tranquilizer.Tranquilizer +import org.apache.curator.framework.CuratorFramework +import scala.reflect.runtime.universe.typeTag + +object KinesisBeamUtils +{ + def createTranquilizer( + topic: String, + config: DataSourceConfig[PropertiesBasedKinesisConfig], + curator: CuratorFramework, + finagleRegistry: FinagleRegistry + ): Tranquilizer[Array[Byte]] = + { + DruidBeams.fromConfig(config, typeTag[Array[Byte]]) + .location( + DruidLocation.create( + config.propertiesBasedConfig.druidIndexingServiceName, + config.dataSource + ) + ) + .curator(curator) + .finagleRegistry(finagleRegistry) + .buildTranquilizer(config.tranquilizerBuilder()) + } +} diff --git a/kinesis/src/test/java/com/metamx/tranquility/kinesis/KinesisConfigTest.java b/kinesis/src/test/java/com/metamx/tranquility/kinesis/KinesisConfigTest.java new file mode 100644 index 0000000..d0d4ad4 --- /dev/null +++ b/kinesis/src/test/java/com/metamx/tranquility/kinesis/KinesisConfigTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.metamx.tranquility.kinesis; + +import com.metamx.tranquility.kinesis.model.PropertiesBasedKinesisConfig; +import junit.framework.TestCase; +import org.junit.Test; +import org.skife.config.ConfigurationObjectFactory; + +import java.util.Properties; + + +public class KinesisConfigTest extends TestCase +{ + @Test + public void testConfig() throws Exception + { + Properties consumerProperties = new Properties(); + consumerProperties.setProperty("kinesis.aws.region", "us-east-1"); + + PropertiesBasedKinesisConfig config = new ConfigurationObjectFactory(consumerProperties).build( + PropertiesBasedKinesisConfig.class); + + } + +} diff --git a/kinesis/src/test/java/com/metamx/tranquility/kinesis/KinesisConsumerTest.java b/kinesis/src/test/java/com/metamx/tranquility/kinesis/KinesisConsumerTest.java new file mode 100644 index 0000000..ee02118 --- /dev/null +++ b/kinesis/src/test/java/com/metamx/tranquility/kinesis/KinesisConsumerTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.metamx.tranquility.kinesis; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class KinesisConsumerTest +{ + + @BeforeClass + public static void setUpBeforeClass() throws Exception + { + } + + @AfterClass + public static void tearDownAfterClass() throws Exception + { + } + + @Test(timeout = 60_000L) + public void testConsumer() throws Exception + { + + } +} diff --git a/kinesis/src/test/resources/kinesis.json b/kinesis/src/test/resources/kinesis.json new file mode 100644 index 0000000..0e9808d --- /dev/null +++ b/kinesis/src/test/resources/kinesis.json @@ -0,0 +1,166 @@ +{ + "dataSources" : [ + { + "spec" : { + "dataSchema" : { + "parser" : { + "type" : "string", + "parseSpec" : { + "timestampSpec" : { + "format" : "auto", + "column" : "timestamp" + }, + "dimensionsSpec" : { + "spatialDimensions" : [ + { + "dims" : [ + "lat", + "lon" + ], + "dimName" : "geo" + } + ], + "dimensions" : [ + "text", + "hashtags", + "lat", + "lon", + "source", + "retweet", + "lang", + "utc_offset", + "screen_name", + "verified" + ] + }, + "format" : "json" + } + }, + "dataSource" : "twitter", + "granularitySpec" : { + "segmentGranularity" : "hour", + "type" : "uniform", + "queryGranularity" : "none" + }, + "metricsSpec" : [ + { + "type" : "count", + "name" : "tweets" + }, + { + "fieldName" : "followers", + "type" : "longSum", + "name" : "followers" + }, + { + "name" : "retweets", + "type" : "longSum", + "fieldName" : "retweets" + }, + { + "fieldName" : "friends", + "type" : "longSum", + "name" : "friends" + }, + { + "name" : "statuses", + "type" : "longSum", + "fieldName" : "statuses" + } + ] + }, + "tuningConfig" : { + "maxRowsInMemory" : "100000", + "type" : "realtime", + "windowPeriod" : "PT10M", + "intermediatePersistPeriod" : "PT10M" + } + }, + "properties" : { + "topicPattern.priority" : "1", + "topicPattern" : "twitter" + } + }, + { + "spec" : { + "dataSchema" : { + "granularitySpec" : { + "queryGranularity" : "none", + "type" : "uniform", + "segmentGranularity" : "hour" + }, + "dataSource" : "wikipedia", + "parser" : { + "type" : "string", + "parseSpec" : { + "timestampSpec" : { + "format" : "auto", + "column" : "timestamp" + }, + "format" : "json", + "dimensionsSpec" : { + "dimensions" : [ + "page", + "language", + "user", + "unpatrolled", + "newPage", + "robot", + "anonymous", + "namespace", + "continent", + "country", + "region", + "city" + ] + } + } + }, + "metricsSpec" : [ + { + "type" : "count", + "name" : "count" + }, + { + "type" : "doubleSum", + "name" : "added", + "fieldName" : "added" + }, + { + "name" : "deleted", + "type" : "doubleSum", + "fieldName" : "deleted" + }, + { + "name" : "delta", + "type" : "doubleSum", + "fieldName" : "delta" + } + ] + }, + "tuningConfig" : { + "type" : "realtime", + "intermediatePersistPeriod" : "PT10M", + "windowPeriod" : "PT10M", + "maxRowsInMemory" : 75000 + } + }, + "properties" : { + "task.partitions" : "2", + "task.replicants" : "2", + "topicPattern" : "wikipedia.*", + "topicPattern.priority" : "1" + } + } + ], + "properties" : { + "zookeeper.connect" : "localhost:2181", + "zookeeper.timeout" : "PT20S", + "druid.selectors.indexing.serviceName" : "druid/overlord", + "druid.discovery.curator.path" : "/druid/discovery", + "kinesis.aws.region" : "us-east-1", + "consumer.numThreads" : "2", + "commit.periodMillis" : "15000", + "reportDropsAsExceptions" : "false" + } +} diff --git a/kinesis/src/test/resources/kinesis.yaml b/kinesis/src/test/resources/kinesis.yaml new file mode 100644 index 0000000..054bfb9 --- /dev/null +++ b/kinesis/src/test/resources/kinesis.yaml @@ -0,0 +1,82 @@ +dataSources: + - spec: + dataSchema: + dataSource: twitter + parser: + type: string + parseSpec: + format: json + timestampSpec: + column: timestamp + format: auto + dimensionsSpec: + dimensions: [text, hashtags, lat, lon, source, retweet, lang, utc_offset, screen_name, verified] + spatialDimensions: [ { dimName: geo, dims: [lat, lon]} ] + + metricsSpec: + - { type: count, name: tweets } + - { type: longSum, name: followers, fieldName: followers } + - { type: longSum, name: retweets, fieldName: retweets } + - { type: longSum, name: friends, fieldName: friends } + - { type: longSum, name: statuses, fieldName: statuses } + + granularitySpec: + type: uniform + segmentGranularity: hour + queryGranularity: none + + tuningConfig: + type: realtime + maxRowsInMemory: 100000 + intermediatePersistPeriod: PT10M + windowPeriod: PT10M + + properties: + topicPattern: twitter + topicPattern.priority: 1 + + - spec: + dataSchema: + dataSource: wikipedia + parser: + type: string + parseSpec: + format: json + timestampSpec: + column: timestamp + format: auto + dimensionsSpec: + dimensions: [page, language, user, unpatrolled, newPage, robot, anonymous, namespace, continent, country, region, city] + + metricsSpec: + - { type: count, name: count } + - { type: doubleSum, name: added, fieldName: added } + - { type: doubleSum, name: deleted, fieldName: deleted } + - { type: doubleSum, name: delta, fieldName: delta } + + granularitySpec: + type: uniform + segmentGranularity: hour + queryGranularity: none + + tuningConfig: + type: realtime + windowPeriod: PT10M + intermediatePersistPeriod: PT10M + maxRowsInMemory: 75000 + + properties: + task.partitions: 2 + task.replicants: 2 + topicPattern: wikipedia.* + topicPattern.priority: 1 + +properties: + zookeeper.connect: localhost:2181 + zookeeper.timeout: PT20S + druid.discovery.curator.path: /druid/discovery + druid.selectors.indexing.serviceName: druid/overlord + kinesis.aws.region : us-east-1 + consumer.numThreads: 2 + commit.periodMillis: 15000 + reportDropsAsExceptions: false