Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP : DNM : Kinesis Integration #254

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ val jettyVersion = "9.2.5.v20141112"
val apacheHttpVersion = "4.3.3"
val kafkaVersion = "0.10.1.1"
val airlineVersion = "0.7"
val amazonKinesisClientVersion = "1.8.1"

def dependOnDruid(artifact: String) = {
("io.druid" % artifact % druidVersion
Expand Down Expand Up @@ -136,6 +137,15 @@ val kafkaDependencies = Seq(
"io.airlift" % "airline" % airlineVersion
) ++ loggingDependencies

val kinesisDependencies = Seq(
"org.apache.kafka" %% "kafka" % kafkaVersion
exclude("org.slf4j", "slf4j-log4j12")
exclude("log4j", "log4j")
force(),
"io.airlift" % "airline" % airlineVersion,
"com.amazonaws" % "amazon-kinesis-client" % amazonKinesisClientVersion
) ++ loggingDependencies

val coreTestDependencies = Seq(
"org.scalatest" %% "scalatest" % "2.2.5" % "test",
dependOnDruid("druid-services") % "test",
Expand Down Expand Up @@ -175,6 +185,10 @@ val kafkaTestDependencies = Seq(
"org.easymock" % "easymock" % "3.4" % "test"
)

val kinesisTestDependencies = Seq(
"org.easymock" % "easymock" % "3.4" % "test"
)

lazy val commonSettings = Seq(
organization := "io.druid",

Expand Down Expand Up @@ -266,6 +280,13 @@ lazy val kafka = project.in(file("kafka"))
.settings(publishArtifact in Test := false)
.dependsOn(core % "test->test;compile->compile")

lazy val kinesis = project.in(file("kinesis"))
.settings(commonSettings: _*)
.settings(name := "tranquility-kinesis")
.settings(libraryDependencies ++= (kinesisDependencies ++ kinesisTestDependencies))
.settings(publishArtifact in Test := false)
.dependsOn(core % "test->test;compile->compile")

lazy val distribution = project.in(file("distribution"))
.settings(commonSettings: _*)
.settings(name := "tranquility-distribution")
Expand All @@ -274,4 +295,4 @@ lazy val distribution = project.in(file("distribution"))
.settings(executableScriptName := "tranquility")
.settings(bashScriptExtraDefines += """addJava "-Dlogback.configurationFile=${app_home}/../conf/logback.xml"""")
.enablePlugins(JavaAppPackaging)
.dependsOn(kafka, server)
.dependsOn(kafka, kinesis, server)
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.metamx.tranquility.kinesis;

import com.google.common.base.Throwables;
import com.metamx.common.logger.Logger;
import com.metamx.tranquility.config.DataSourceConfig;
import com.metamx.tranquility.kafka.model.MessageCounters;
import com.metamx.tranquility.kinesis.model.PropertiesBasedKinesisConfig;
import com.metamx.tranquility.kafka.writer.WriterController;
import io.druid.concurrent.Execs;


import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* TODO: Port this to Kinesis
*/
public class KinesisConsumer
{
private static final Logger log = new Logger(KinesisConsumer.class);

private final ExecutorService consumerExec;
private final Thread commitThread;
private final AtomicBoolean shutdown = new AtomicBoolean();

// prevents reading the next event from Kafka while events are being flushed and offset is being committed to ZK
private final ReentrantReadWriteLock commitLock = new ReentrantReadWriteLock();

private final int numThreads;
private final int commitMillis;
private final WriterController writerController;

private Map<String, MessageCounters> previousMessageCounters = new HashMap<>();

public KinesisConsumer(
final PropertiesBasedKinesisConfig globalConfig,
final Properties kinesisProperties,
final Map<String, DataSourceConfig<PropertiesBasedKinesisConfig>> dataSourceConfigs,
final WriterController writerController
)
{
int defaultNumThreads = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
this.numThreads = globalConfig.getConsumerNumThreads() > 0
? globalConfig.getConsumerNumThreads()
: defaultNumThreads;

this.commitMillis = globalConfig.getCommitPeriodMillis();
this.writerController = writerController;
this.consumerExec = Execs.multiThreaded(numThreads, "KafkaConsumer-%d");
this.commitThread = new Thread(createCommitRunnable());
this.commitThread.setName("KafkaConsumer-CommitThread");
this.commitThread.setDaemon(true);
}

public void start()
{
commitThread.start();
// startConsumers();
}

public void stop()
{
if (shutdown.compareAndSet(false, true)) {
log.info("Shutting down - attempting to flush buffers and commit final offsets");

try {
commitLock.writeLock().lockInterruptibly(); // prevent Kafka from consuming any more events
try {
writerController.flushAll(); // try to flush the remaining events to Druid
writerController.stop();
}
finally {
commitLock.writeLock().unlock();
commitThread.interrupt();
consumerExec.shutdownNow();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
Throwables.propagate(e);
}

log.info("Finished clean shutdown.");
}
}

public void join() throws InterruptedException
{
commitThread.join();
}

void commit() throws InterruptedException
{
commitLock.writeLock().lockInterruptibly();
try {
final long flushStartTime = System.currentTimeMillis();
final Map<String, MessageCounters> messageCounters = writerController.flushAll(); // blocks until complete

final long commitStartTime = System.currentTimeMillis();

final long finishedTime = System.currentTimeMillis();
Map<String, MessageCounters> countsSinceLastCommit = new HashMap();
for (Map.Entry<String, MessageCounters> entry : messageCounters.entrySet()) {
countsSinceLastCommit.put(
entry.getKey(),
entry.getValue().difference(previousMessageCounters.get(entry.getKey()))
);
}

previousMessageCounters = messageCounters;

log.info(
"Flushed %s pending messages in %sms and committed offsets in %sms.",
countsSinceLastCommit.isEmpty() ? "0" : countsSinceLastCommit,
commitStartTime - flushStartTime,
finishedTime - commitStartTime
);
}
finally {
commitLock.writeLock().unlock();
}
}

private Runnable createCommitRunnable()
{
return new Runnable()
{
@Override
public void run()
{
long lastFlushTime = System.currentTimeMillis();
try {
while (!Thread.currentThread().isInterrupted()) {
Thread.sleep(Math.max(commitMillis - (System.currentTimeMillis() - lastFlushTime), 0));
commit();
lastFlushTime = System.currentTimeMillis();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.info("Commit thread interrupted.");
}
catch (Throwable e) {
log.error(e, "Commit thread failed!");
throw Throwables.propagate(e);
}
finally {
stop();
}
}
};
}

private static String buildTopicFilter(Map<String, DataSourceConfig<PropertiesBasedKinesisConfig>> dataSourceConfigs)
{
StringBuilder topicFilter = new StringBuilder();
for (Map.Entry<String, DataSourceConfig<PropertiesBasedKinesisConfig>> entry : dataSourceConfigs.entrySet()) {
topicFilter.append(String.format("(%s)|", entry.getValue().propertiesBasedConfig().getTopicPattern()));
}

return topicFilter.length() > 0 ? topicFilter.substring(0, topicFilter.length() - 1) : "";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.metamx.tranquility.kinesis;

import java.util.List;

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.model.Record;

public class KinesisEventConsumer implements IRecordProcessorFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(KinesisEventConsumer.class);
private Worker.Builder builder;

public KinesisEventConsumer(String region, String streamName, String appName, String initialPosition) {

InitialPositionInStream position = InitialPositionInStream.valueOf(initialPosition);

KinesisClientLibConfiguration clientConfig = new KinesisClientLibConfiguration(appName, streamName,
new DefaultAWSCredentialsProviderChain(), appName)
.withRegionName(region)
.withInitialPositionInStream(position);

this.builder = new Worker.Builder().recordProcessorFactory(this).config(clientConfig);
}

public void start(){
this.builder.build().run();
}

@Override
public IRecordProcessor createProcessor() {
LOGGER.debug("Creating recordProcessor.");
return new IRecordProcessor() {
@Override
public void initialize(String shardId) {}

@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
for (Record record : records){
byte[] bytes = new byte[record.getData().remaining()];
record.getData().get(bytes);
String data = new String(bytes);
LOGGER.debug("Received [{}]", data);
}
}

@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
LOGGER.debug("Shutting down [{}]");
}
};

}
}
Loading