diff --git a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DropRule.java b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DropRule.java
index 8bc1effe..01f7a349 100644
--- a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DropRule.java
+++ b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DropRule.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java
index 477cddd8..c968acc8 100755
--- a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java
+++ b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,13 +22,11 @@
import java.util.Iterator;
import java.util.Properties;
import java.util.stream.Collectors;
-
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.VersionInfo;
-
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -44,14 +42,14 @@
import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
/**
- * connects to kafka and reads from the passed in topics. Parses each message into an avro object
+ * connects to kafka and reads from the passed in topics. Parses each message into an avro object
* and dumps it to the console.
*/
@InterfaceAudience.Private
public final class DumpToStringListener {
private static final Logger LOG = LoggerFactory.getLogger(DumpToStringListener.class);
- private DumpToStringListener(){
+ private DumpToStringListener() {
}
public static void main(String[] args) {
@@ -59,10 +57,9 @@ public static void main(String[] args) {
VersionInfo.logVersion();
Options options = new Options();
- options.addRequiredOption("k", "kafkabrokers", true, "Kafka Brokers " +
- "(comma delimited)");
- options.addRequiredOption("t", "kafkatopics", true,"Kafka Topics "
- + "to subscribe to (comma delimited)");
+ options.addRequiredOption("k", "kafkabrokers", true, "Kafka Brokers " + "(comma delimited)");
+ options.addRequiredOption("t", "kafkatopics", true,
+ "Kafka Topics " + "to subscribe to (comma delimited)");
CommandLine commandLine = null;
try {
@@ -73,7 +70,7 @@ public static void main(String[] args) {
}
SpecificDatumReader dreader =
- new SpecificDatumReader<>(HBaseKafkaEvent.SCHEMA$);
+ new SpecificDatumReader<>(HBaseKafkaEvent.SCHEMA$);
String topic = commandLine.getOptionValue('t');
Properties props = new Properties();
@@ -105,8 +102,9 @@ public static void main(String[] args) {
private static void printUsageAndExit(Options options, int exitCode) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("hbase " + DumpToStringListener.class.getName(), "", options,
- "\n[--kafkabrokers ] " +
- "[-k ] \n", true);
+ "\n[--kafkabrokers ] "
+ + "[-k ] \n",
+ true);
System.exit(exitCode);
}
}
diff --git a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaBridgeConnection.java b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaBridgeConnection.java
index 68a88aff..bcb6704a 100644
--- a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaBridgeConnection.java
+++ b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaBridgeConnection.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,7 +22,6 @@
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
-
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.lang3.StringUtils;
@@ -44,27 +43,25 @@
/**
* a alternative implementation of a connection object that forwards the mutations to a kafka queue
* depending on the routing rules (see kafka-route-rules.xml).
- * */
+ */
@InterfaceAudience.Private
public class KafkaBridgeConnection implements Connection {
private final Configuration conf;
private volatile boolean closed = false;
private TopicRoutingRules routingRules;
- private Producer producer;
+ private Producer producer;
private DatumWriter avroWriter =
- new SpecificDatumWriter<>(HBaseKafkaEvent.getClassSchema());
-
-
- /**
- * Public constructor
- * @param conf hbase configuration
- * @param pool executor pool
- * @param user user who requested connection
- * @throws IOException on error
- */
- public KafkaBridgeConnection(Configuration conf,
- ExecutorService pool,
- User user) throws IOException {
+ new SpecificDatumWriter<>(HBaseKafkaEvent.getClassSchema());
+
+ /**
+ * Public constructor
+ * @param conf hbase configuration
+ * @param pool executor pool
+ * @param user user who requested connection
+ * @throws IOException on error
+ */
+ public KafkaBridgeConnection(Configuration conf, ExecutorService pool, User user)
+ throws IOException {
this.conf = conf;
setupRules();
startKafkaConnection();
@@ -72,12 +69,12 @@ public KafkaBridgeConnection(Configuration conf,
/**
* for testing.
- * @param conf hbase configuration
+ * @param conf hbase configuration
* @param routingRules a set of routing rules
- * @param producer a kafka producer
+ * @param producer a kafka producer
*/
public KafkaBridgeConnection(Configuration conf, TopicRoutingRules routingRules,
- Producer producer) {
+ Producer producer) {
this.conf = conf;
this.producer = producer;
this.routingRules = routingRules;
@@ -86,7 +83,7 @@ public KafkaBridgeConnection(Configuration conf, TopicRoutingRules routingRules,
private void setupRules() throws IOException {
String file = this.conf.get(KafkaProxy.KAFKA_PROXY_RULES_FILE);
routingRules = new TopicRoutingRules();
- try (FileInputStream fin = new FileInputStream(file);){
+ try (FileInputStream fin = new FileInputStream(file);) {
routingRules.parseRules(fin);
}
}
@@ -94,28 +91,26 @@ private void setupRules() throws IOException {
private void startKafkaConnection() throws IOException {
Properties configProperties = new Properties();
- String kafkaPropsFile = conf.get(KafkaProxy.KAFKA_PROXY_KAFKA_PROPERTIES,"");
- if (!StringUtils.isEmpty(kafkaPropsFile)){
- try (FileInputStream fs = new java.io.FileInputStream(
- new File(kafkaPropsFile))){
+ String kafkaPropsFile = conf.get(KafkaProxy.KAFKA_PROXY_KAFKA_PROPERTIES, "");
+ if (!StringUtils.isEmpty(kafkaPropsFile)) {
+ try (FileInputStream fs = new java.io.FileInputStream(new File(kafkaPropsFile))) {
configProperties.load(fs);
}
} else {
- String kafkaServers =conf.get(KafkaProxy.KAFKA_PROXY_KAFKA_BROKERS);
+ String kafkaServers = conf.get(KafkaProxy.KAFKA_PROXY_KAFKA_BROKERS);
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
}
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.ByteArraySerializer");
+ "org.apache.kafka.common.serialization.ByteArraySerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.ByteArraySerializer");
+ "org.apache.kafka.common.serialization.ByteArraySerializer");
this.producer = new KafkaProducer(configProperties);
}
-
-
@Override
- public void abort(String why, Throwable e) {}
+ public void abort(String why, Throwable e) {
+ }
@Override
public boolean isAborted() {
@@ -194,7 +189,8 @@ public TableBuilder setWriteRpcTimeout(int timeout) {
@Override
public Table build() {
- return new KafkaTableForBridge(tn,passedInConfiguration,routingRules,producer,avroWriter) ;
+ return new KafkaTableForBridge(tn, passedInConfiguration, routingRules, producer,
+ avroWriter);
}
};
}
diff --git a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java
index 2c1aca12..5dda9576 100755
--- a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java
+++ b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -24,7 +24,6 @@
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -45,7 +44,6 @@
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.yetus.audience.InterfaceAudience;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,14 +53,10 @@
import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
-
-
/**
- * hbase to kafka bridge.
- *
- * Starts up a region server and receives replication events, just like a peer
- * cluster member. It takes the events and cell by cell determines how to
- * route them (see kafka-route-rules.xml)
+ * hbase to kafka bridge. Starts up a region server and receives replication events, just like a
+ * peer cluster member. It takes the events and cell by cell determines how to route them (see
+ * kafka-route-rules.xml)
*/
@InterfaceAudience.Private
public final class KafkaProxy {
@@ -72,47 +66,45 @@ public final class KafkaProxy {
public static final String KAFKA_PROXY_KAFKA_PROPERTIES = "kafkaproxy.kafka.properties";
public static final String KAFKA_PROXY_KAFKA_BROKERS = "kafkaproxy.kafka.brokers";
- private static Map DEFAULT_PROPERTIES = new HashMap<>();
- private static Map CAN_OVERRIDE_DEFAULT_PROPERTIES = new HashMap<>();
-
+ private static Map DEFAULT_PROPERTIES = new HashMap<>();
+ private static Map CAN_OVERRIDE_DEFAULT_PROPERTIES = new HashMap<>();
static {
- DEFAULT_PROPERTIES.put("hbase.cluster.distributed","true");
- DEFAULT_PROPERTIES.put("zookeeper.znode.parent","/kafkaproxy");
- DEFAULT_PROPERTIES.put("hbase.regionserver.info.port","17010");
+ DEFAULT_PROPERTIES.put("hbase.cluster.distributed", "true");
+ DEFAULT_PROPERTIES.put("zookeeper.znode.parent", "/kafkaproxy");
+ DEFAULT_PROPERTIES.put("hbase.regionserver.info.port", "17010");
DEFAULT_PROPERTIES.put("hbase.client.connection.impl",
- "org.apache.hadoop.hbase.kafka.KafkaBridgeConnection");
- DEFAULT_PROPERTIES.put("hbase.regionserver.admin.service","false");
- DEFAULT_PROPERTIES.put("hbase.regionserver.client.service","false");
- DEFAULT_PROPERTIES.put("hbase.wal.provider",
- "org.apache.hadoop.hbase.wal.DisabledWALProvider");
- DEFAULT_PROPERTIES.put("hbase.regionserver.workers","false");
- DEFAULT_PROPERTIES.put("hfile.block.cache.size","0.0001");
- DEFAULT_PROPERTIES.put("hbase.mob.file.cache.size","0");
- DEFAULT_PROPERTIES.put("hbase.masterless","true");
- DEFAULT_PROPERTIES.put("hbase.regionserver.metahandler.count","1");
- DEFAULT_PROPERTIES.put("hbase.regionserver.replication.handler.count","1");
- DEFAULT_PROPERTIES.put("hbase.regionserver.handler.count","1");
- DEFAULT_PROPERTIES.put("hbase.ipc.server.read.threadpool.size","3");
-
- CAN_OVERRIDE_DEFAULT_PROPERTIES.put("hbase.regionserver.port","17020");
+ "org.apache.hadoop.hbase.kafka.KafkaBridgeConnection");
+ DEFAULT_PROPERTIES.put("hbase.regionserver.admin.service", "false");
+ DEFAULT_PROPERTIES.put("hbase.regionserver.client.service", "false");
+ DEFAULT_PROPERTIES.put("hbase.wal.provider", "org.apache.hadoop.hbase.wal.DisabledWALProvider");
+ DEFAULT_PROPERTIES.put("hbase.regionserver.workers", "false");
+ DEFAULT_PROPERTIES.put("hfile.block.cache.size", "0.0001");
+ DEFAULT_PROPERTIES.put("hbase.mob.file.cache.size", "0");
+ DEFAULT_PROPERTIES.put("hbase.masterless", "true");
+ DEFAULT_PROPERTIES.put("hbase.regionserver.metahandler.count", "1");
+ DEFAULT_PROPERTIES.put("hbase.regionserver.replication.handler.count", "1");
+ DEFAULT_PROPERTIES.put("hbase.regionserver.handler.count", "1");
+ DEFAULT_PROPERTIES.put("hbase.ipc.server.read.threadpool.size", "3");
+
+ CAN_OVERRIDE_DEFAULT_PROPERTIES.put("hbase.regionserver.port", "17020");
}
private static void printUsageAndExit(Options options, int exitCode) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("hbase kafkaproxy start", "", options,
- "\nTo run the kafka proxy as a daemon, execute " +
- "hbase-connectors-daemon.sh start|stop kafkaproxy \n" +
- "[--kafkabrokers (or -b) ] \n" +
- "[--routerulesfile (or -r) ] \n" +
- "[--kafkaproperties (or -f) ] \n" +
- "[--peername (or -p) name of hbase peer to use (defaults to hbasekafka)]\n " +
- "[--znode (or -z) root znode (defaults to /kafkaproxy)] \n" +
- "[--enablepeer (or -e) enable peer on startup (defaults to false)]\n " +
- "[--auto (or -a) auto create peer] " +
- "\n", true);
+ "\nTo run the kafka proxy as a daemon, execute "
+ + "hbase-connectors-daemon.sh start|stop kafkaproxy \n"
+ + "[--kafkabrokers (or -b) ] \n"
+ + "[--routerulesfile (or -r) ] \n"
+ + "[--kafkaproperties (or -f) ] \n"
+ + "[--peername (or -p) name of hbase peer to use (defaults to hbasekafka)]\n "
+ + "[--znode (or -z) root znode (defaults to /kafkaproxy)] \n"
+ + "[--enablepeer (or -e) enable peer on startup (defaults to false)]\n "
+ + "[--auto (or -a) auto create peer] " + "\n",
+ true);
System.exit(exitCode);
}
@@ -130,24 +122,20 @@ private KafkaProxy() {
*/
public static void main(String[] args) throws Exception {
- Map otherProps = new HashMap<>();
+ Map otherProps = new HashMap<>();
Options options = new Options();
- options.addRequiredOption("b", "kafkabrokers", true,
- "Kafka Brokers (comma delimited)");
+ options.addRequiredOption("b", "kafkabrokers", true, "Kafka Brokers (comma delimited)");
options.addOption("r", "routerulesfile", true,
"file that has routing rules (defaults to conf/kafka-route-rules.xml");
options.addOption("f", "kafkaproperties", true,
"Path to properties file that has the kafka connection properties");
- options.addRequiredOption("p", "peername", true,
- "Name of hbase peer");
+ options.addRequiredOption("p", "peername", true, "Name of hbase peer");
options.addOption("z", "znode", true,
- "root zode to use in zookeeper (defaults to /kafkaproxy)");
- options.addOption("a", "autopeer", false,
- "Create a peer automatically to the hbase cluster");
- options.addOption("e", "enablepeer", false,
- "enable peer on startup (defaults to false)");
+ "root zode to use in zookeeper (defaults to /kafkaproxy)");
+ options.addOption("a", "autopeer", false, "Create a peer automatically to the hbase cluster");
+ options.addOption("e", "enablepeer", false, "enable peer on startup (defaults to false)");
LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
VersionInfo.logVersion();
@@ -168,9 +156,8 @@ public static void main(String[] args) throws Exception {
printUsageAndExit(options, -1);
}
-
- String peer="";
- if (!commandLine.hasOption('p')){
+ String peer = "";
+ if (!commandLine.hasOption('p')) {
System.err.println("hbase peer id is required");
System.exit(-1);
} else {
@@ -180,154 +167,141 @@ public static void main(String[] args) throws Exception {
boolean createPeer = false;
boolean enablePeer = false;
- if (commandLine.hasOption('a')){
- createPeer=true;
+ if (commandLine.hasOption('a')) {
+ createPeer = true;
}
- if (commandLine.hasOption("a")){
- enablePeer=true;
+ if (commandLine.hasOption("a")) {
+ enablePeer = true;
}
- String rulesFile = StringUtils.defaultIfBlank(
- commandLine.getOptionValue("r"),"kafka-route-rules.xml");
+ String rulesFile =
+ StringUtils.defaultIfBlank(commandLine.getOptionValue("r"), "kafka-route-rules.xml");
- if (!new File(rulesFile).exists()){
- if (KafkaProxy.class.getClassLoader().getResource(rulesFile)!=null){
+ if (!new File(rulesFile).exists()) {
+ if (KafkaProxy.class.getClassLoader().getResource(rulesFile) != null) {
rulesFile = KafkaProxy.class.getClassLoader().getResource(rulesFile).getFile();
} else {
- System.err.println("Rules file " + rulesFile +
- " is invalid");
+ System.err.println("Rules file " + rulesFile + " is invalid");
System.exit(-1);
}
}
- otherProps.put(KafkaProxy.KAFKA_PROXY_RULES_FILE,rulesFile);
+ otherProps.put(KafkaProxy.KAFKA_PROXY_RULES_FILE, rulesFile);
- if (commandLine.hasOption('f')){
- otherProps.put(KafkaProxy.KAFKA_PROXY_KAFKA_PROPERTIES,commandLine.getOptionValue('f'));
- } else if (commandLine.hasOption('b')){
- otherProps.put(KafkaProxy.KAFKA_PROXY_KAFKA_BROKERS,commandLine.getOptionValue('b'));
+ if (commandLine.hasOption('f')) {
+ otherProps.put(KafkaProxy.KAFKA_PROXY_KAFKA_PROPERTIES, commandLine.getOptionValue('f'));
+ } else if (commandLine.hasOption('b')) {
+ otherProps.put(KafkaProxy.KAFKA_PROXY_KAFKA_BROKERS, commandLine.getOptionValue('b'));
} else {
System.err.println("Kafka connection properites or brokers must be specified");
System.exit(-1);
}
- String zookeeperQ = conf.get("hbase.zookeeper.quorum") + ":" +
- conf.get("hbase.zookeeper.property.clientPort");
+ String zookeeperQ =
+ conf.get("hbase.zookeeper.quorum") + ":" + conf.get("hbase.zookeeper.property.clientPort");
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(20000, 20);
- try (CuratorFramework zk = CuratorFrameworkFactory.newClient(zookeeperQ, retryPolicy);
- ) {
+ try (CuratorFramework zk = CuratorFrameworkFactory.newClient(zookeeperQ, retryPolicy);) {
zk.start();
String rootZnode = "/kafkaproxy";
- setupZookeeperZnodes(zk,rootZnode,peer);
- checkForOrCreateReplicationPeer(conf,zk,rootZnode,peer,createPeer,enablePeer);
+ setupZookeeperZnodes(zk, rootZnode, peer);
+ checkForOrCreateReplicationPeer(conf, zk, rootZnode, peer, createPeer, enablePeer);
}
@SuppressWarnings("unchecked")
Class extends HRegionServer> regionServerClass = (Class extends HRegionServer>) conf
- .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
+ .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
List allArgs = DEFAULT_PROPERTIES.keySet().stream()
- .map((argKey)->("-D"+argKey+"="+ DEFAULT_PROPERTIES.get(argKey)))
- .collect(Collectors.toList());
+ .map((argKey) -> ("-D" + argKey + "=" + DEFAULT_PROPERTIES.get(argKey)))
+ .collect(Collectors.toList());
allArgs.addAll(CAN_OVERRIDE_DEFAULT_PROPERTIES.keySet().stream()
- .filter((argKey)->commandLineConf.get(argKey,"").equalsIgnoreCase(""))
- .map((argKey)->("-D"+argKey+"="+ CAN_OVERRIDE_DEFAULT_PROPERTIES.get(argKey)))
- .collect(Collectors.toList()));
+ .filter((argKey) -> commandLineConf.get(argKey, "").equalsIgnoreCase(""))
+ .map((argKey) -> ("-D" + argKey + "=" + CAN_OVERRIDE_DEFAULT_PROPERTIES.get(argKey)))
+ .collect(Collectors.toList()));
- for (Map.Entry k : commandLineConf){
- allArgs.add("-D"+k.getKey()+"="+k.getValue());
+ for (Map.Entry k : commandLineConf) {
+ allArgs.add("-D" + k.getKey() + "=" + k.getValue());
}
- otherProps.keySet().stream()
- .map((argKey)->("-D"+argKey+"="+ otherProps.get(argKey)))
- .forEach((item)->allArgs.add(item));
+ otherProps.keySet().stream().map((argKey) -> ("-D" + argKey + "=" + otherProps.get(argKey)))
+ .forEach((item) -> allArgs.add(item));
- Arrays.stream(restArgs)
- .filter((arg)->(arg.startsWith("-D")||arg.equals("start")))
- .forEach((arg)->allArgs.add(arg));
+ Arrays.stream(restArgs).filter((arg) -> (arg.startsWith("-D") || arg.equals("start")))
+ .forEach((arg) -> allArgs.add(arg));
// is start there?
- if (allArgs.stream()
- .filter((arg)->arg.equalsIgnoreCase("start")).count() < 1){
+ if (allArgs.stream().filter((arg) -> arg.equalsIgnoreCase("start")).count() < 1) {
allArgs.add("start");
}
- String[] newArgs=new String[allArgs.size()];
+ String[] newArgs = new String[allArgs.size()];
allArgs.toArray(newArgs);
new HRegionServerCommandLine(regionServerClass).doMain(newArgs);
}
-
/**
* Set up the needed znodes under the rootZnode
- * @param zk CuratorFramework framework instance
+ * @param zk CuratorFramework framework instance
* @param rootZnode Root znode
* @throws Exception If an error occurs
*/
- public static void setupZookeeperZnodes(CuratorFramework zk, String rootZnode,String peer)
- throws Exception {
+ public static void setupZookeeperZnodes(CuratorFramework zk, String rootZnode, String peer)
+ throws Exception {
// always gives the same uuid for the same name
UUID uuid = UUID.nameUUIDFromBytes(Bytes.toBytes(peer));
String newValue = uuid.toString();
- byte []uuidBytes = Bytes.toBytes(newValue);
- String idPath=rootZnode+"/hbaseid";
+ byte[] uuidBytes = Bytes.toBytes(newValue);
+ String idPath = rootZnode + "/hbaseid";
if (zk.checkExists().forPath(idPath) == null) {
zk.create().forPath(rootZnode);
- zk.create().forPath(rootZnode +"/hbaseid",uuidBytes);
+ zk.create().forPath(rootZnode + "/hbaseid", uuidBytes);
} else {
// If the znode is there already make sure it has the
// expected value for the peer name.
byte[] znodeBytes = zk.getData().forPath(idPath).clone();
- if (!Bytes.equals(znodeBytes,uuidBytes)){
+ if (!Bytes.equals(znodeBytes, uuidBytes)) {
String oldValue = Bytes.toString(znodeBytes);
- LOG.warn("znode "+idPath+" has unexpected value "+ oldValue
- +" expecting " + newValue + " "
- + " (did the peer name for the proxy change?) "
- + "Updating value");
+ LOG.warn("znode " + idPath + " has unexpected value " + oldValue + " expecting " + newValue
+ + " " + " (did the peer name for the proxy change?) " + "Updating value");
zk.setData().forPath(idPath, uuidBytes);
}
}
}
/**
- * Poll for the configured peer or create it if it does not exist
- * (controlled by createIfMissing)
- * @param hbaseConf the hbase configuratoin
- * @param zk CuratorFramework object
- * @param basePath base znode.
- * @param peerName id if the peer to check for/create
- * @param enablePeer if the peer is detected or created, enable it.
+ * Poll for the configured peer or create it if it does not exist (controlled by createIfMissing)
+ * @param hbaseConf the hbase configuratoin
+ * @param zk CuratorFramework object
+ * @param basePath base znode.
+ * @param peerName id if the peer to check for/create
+ * @param enablePeer if the peer is detected or created, enable it.
* @param createIfMissing if the peer doesn't exist, create it and peer to it.
*/
- public static void checkForOrCreateReplicationPeer(Configuration hbaseConf,
- CuratorFramework zk,
- String basePath,
- String peerName, boolean createIfMissing,
- boolean enablePeer) {
+ public static void checkForOrCreateReplicationPeer(Configuration hbaseConf, CuratorFramework zk,
+ String basePath, String peerName, boolean createIfMissing, boolean enablePeer) {
try (Connection conn = ConnectionFactory.createConnection(hbaseConf);
- Admin admin = conn.getAdmin()) {
+ Admin admin = conn.getAdmin()) {
boolean peerThere = false;
while (!peerThere) {
try {
ReplicationPeerConfig peerConfig = admin.getReplicationPeerConfig(peerName);
- if (peerConfig !=null) {
- peerThere=true;
+ if (peerConfig != null) {
+ peerThere = true;
}
} catch (ReplicationPeerNotFoundException e) {
if (createIfMissing) {
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
// get the current cluster's ZK config
- String zookeeperQ = hbaseConf.get("hbase.zookeeper.quorum") +
- ":" +
- hbaseConf.get("hbase.zookeeper.property.clientPort");
- String znodePath = zookeeperQ + ":"+basePath;
+ String zookeeperQ = hbaseConf.get("hbase.zookeeper.quorum") + ":"
+ + hbaseConf.get("hbase.zookeeper.property.clientPort");
+ String znodePath = zookeeperQ + ":" + basePath;
ReplicationPeerConfig rconf = builder.setClusterKey(znodePath).build();
admin.addReplicationPeer(peerName, rconf);
peerThere = true;
@@ -335,20 +309,19 @@ public static void checkForOrCreateReplicationPeer(Configuration hbaseConf,
}
if (peerThere) {
- if (enablePeer){
+ if (enablePeer) {
LOG.info("enable peer,{}", peerName);
List peers = admin.listReplicationPeers().stream()
- .filter(peer -> peer.getPeerId().equals(peerName))
- .filter(peer -> !peer.isEnabled())
- .collect(Collectors.toList());
- if (!peers.isEmpty()){
+ .filter(peer -> peer.getPeerId().equals(peerName)).filter(peer -> !peer.isEnabled())
+ .collect(Collectors.toList());
+ if (!peers.isEmpty()) {
admin.enableReplicationPeer(peerName);
}
}
break;
} else {
- LOG.info("peer "+
- peerName+" not found, service will not completely start until the peer exists");
+ LOG.info("peer " + peerName
+ + " not found, service will not completely start until the peer exists");
}
Thread.sleep(5000);
}
@@ -356,7 +329,7 @@ public static void checkForOrCreateReplicationPeer(Configuration hbaseConf,
LOG.info("found replication peer " + peerName);
} catch (Exception e) {
- LOG.error("Exception running proxy ",e);
+ LOG.error("Exception running proxy ", e);
}
}
}
diff --git a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java
index 31baa44e..6103aaee 100755
--- a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java
+++ b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -24,7 +24,6 @@
import java.util.List;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
-
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
@@ -49,7 +48,6 @@
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
-
@InterfaceAudience.Private
public class KafkaTableForBridge implements Table {
private Logger LOG = LoggerFactory.getLogger(KafkaTableForBridge.class);
@@ -58,14 +56,14 @@ public class KafkaTableForBridge implements Table {
private final TableName tableName;
private byte[] tableAsBytes;
- private Producer producer;
+ private Producer producer;
private TopicRoutingRules routingRules;
private DatumWriter avroWriter;
private static final class CheckMutation {
- byte[]qualifier;
- byte[]family;
+ byte[] qualifier;
+ byte[] family;
Cell cell;
List topics = new ArrayList<>();
}
@@ -75,106 +73,94 @@ public RegionLocator getRegionLocator() throws IOException {
throw new UnsupportedOperationException();
}
- public KafkaTableForBridge(TableName tableName,
- Configuration conf,
- TopicRoutingRules routingRules,
- Producer producer,
- DatumWriter avroWriter){
- this.conf=conf;
- this.tableName=tableName;
- this.tableAsBytes=this.tableName.toBytes();
- this.routingRules=routingRules;
- this.producer=producer;
- this.avroWriter=avroWriter;
+ public KafkaTableForBridge(TableName tableName, Configuration conf,
+ TopicRoutingRules routingRules, Producer producer,
+ DatumWriter avroWriter) {
+ this.conf = conf;
+ this.tableName = tableName;
+ this.tableAsBytes = this.tableName.toBytes();
+ this.routingRules = routingRules;
+ this.producer = producer;
+ this.avroWriter = avroWriter;
}
- private List> processMutation(CheckMutation check, boolean isDelete){
+ private List> processMutation(CheckMutation check,
+ boolean isDelete) {
HBaseKafkaEvent event = new HBaseKafkaEvent();
- event.setKey(ByteBuffer.wrap(check.cell.getRowArray(),
- check.cell.getRowOffset(),
- check.cell.getRowLength()));
+ event.setKey(ByteBuffer.wrap(check.cell.getRowArray(), check.cell.getRowOffset(),
+ check.cell.getRowLength()));
event.setTable(ByteBuffer.wrap(tableAsBytes));
event.setDelete(isDelete);
event.setTimestamp(check.cell.getTimestamp());
event.setFamily(ByteBuffer.wrap(check.family));
event.setQualifier(ByteBuffer.wrap(check.qualifier));
- event.setValue(ByteBuffer.wrap(check.cell.getValueArray(),
- check.cell.getValueOffset(),
- check.cell.getValueLength()));
+ event.setValue(ByteBuffer.wrap(check.cell.getValueArray(), check.cell.getValueOffset(),
+ check.cell.getValueLength()));
- return check.topics.stream()
- .map((topic)->new Pair(topic,event))
- .collect(Collectors.toList());
+ return check.topics.stream().map((topic) -> new Pair(topic, event))
+ .collect(Collectors.toList());
}
- private boolean keep(CheckMutation ret){
- if (!routingRules.isExclude(this.tableName,ret.family, ret.qualifier)){
+ private boolean keep(CheckMutation ret) {
+ if (!routingRules.isExclude(this.tableName, ret.family, ret.qualifier)) {
return true;
}
return false;
}
- private CheckMutation addTopics(CheckMutation ret){
- ret.topics= routingRules.getTopics(this.tableName,ret.family,ret.qualifier);
+ private CheckMutation addTopics(CheckMutation ret) {
+ ret.topics = routingRules.getTopics(this.tableName, ret.family, ret.qualifier);
return ret;
}
- private ProducerRecord toByteArray(ByteArrayOutputStream bout,
- Pair event,
- BinaryEncoder encoder) {
+ private ProducerRecord toByteArray(ByteArrayOutputStream bout,
+ Pair event, BinaryEncoder encoder) {
try {
bout.reset();
BinaryEncoder encoderUse = EncoderFactory.get().binaryEncoder(bout, encoder);
avroWriter.write(event.getSecond(), encoderUse);
encoder.flush();
- return new ProducerRecord(event.getFirst(),
- event.getSecond().getKey().array(),
- bout.toByteArray());
- } catch (Exception e){
+ return new ProducerRecord(event.getFirst(),
+ event.getSecond().getKey().array(), bout.toByteArray());
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void batch(final List extends Row> actions, Object[] results)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
BinaryEncoder encoderUse = EncoderFactory.get().binaryEncoder(bout, null);
- LOG.debug("got {} inputs ",actions.size());
+ LOG.debug("got {} inputs ", actions.size());
List> sends = new ArrayList<>();
- actions.stream()
- .filter((row)->row instanceof Mutation)
- .map((row)->(Mutation)row)
- .flatMap((row)->{
+ actions.stream().filter((row) -> row instanceof Mutation).map((row) -> (Mutation) row)
+ .flatMap((row) -> {
Mutation mut = (Mutation) row;
boolean isDelete = mut instanceof Delete;
return mut.getFamilyCellMap().keySet().stream()
- .flatMap((family)->mut.getFamilyCellMap().get(family).stream())
- .map((cell)->{
- CheckMutation ret = new CheckMutation();
- ret.family=CellUtil.cloneFamily(cell);
- ret.qualifier=CellUtil.cloneQualifier(cell);
- ret.cell=cell;
- return ret;
- })
- .filter((check)->keep(check))
- .map((check)->addTopics(check))
- .filter((check)->!CollectionUtils.isEmpty(check.topics))
- .flatMap((check)->processMutation(check,isDelete).stream());
- })
- .map((event)->toByteArray(bout,event,encoderUse))
- .forEach((item)->sends.add(producer.send(item)));
+ .flatMap((family) -> mut.getFamilyCellMap().get(family).stream()).map((cell) -> {
+ CheckMutation ret = new CheckMutation();
+ ret.family = CellUtil.cloneFamily(cell);
+ ret.qualifier = CellUtil.cloneQualifier(cell);
+ ret.cell = cell;
+ return ret;
+ }).filter((check) -> keep(check)).map((check) -> addTopics(check))
+ .filter((check) -> !CollectionUtils.isEmpty(check.topics))
+ .flatMap((check) -> processMutation(check, isDelete).stream());
+ }).map((event) -> toByteArray(bout, event, encoderUse))
+ .forEach((item) -> sends.add(producer.send(item)));
// make sure the sends are done before returning
- sends.stream().forEach((sendResult)->{
+ sends.stream().forEach((sendResult) -> {
try {
sendResult.get();
- } catch (Exception e){
- LOG.error("Exception caught when getting result",e);
+ } catch (Exception e) {
+ LOG.error("Exception caught when getting result", e);
throw new RuntimeException(e);
}
});
diff --git a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/Rule.java b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/Rule.java
index 7d02025b..08453ff2 100644
--- a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/Rule.java
+++ b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/Rule.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,29 +22,28 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
-
/**
* Implements the matching logic for a rule
*/
@InterfaceAudience.Private
public abstract class Rule {
TableName tableName;
- private byte [] columnFamily;
- private byte [] qualifier;
+ private byte[] columnFamily;
+ private byte[] qualifier;
boolean qualifierStartsWith = false;
boolean qualifierEndsWith = false;
- byte []ast = Bytes.toBytes("*");
+ byte[] ast = Bytes.toBytes("*");
/**
* Indicates if the table,column family, and qualifier match the rule
- * @param tryTable table name to test
+ * @param tryTable table name to test
* @param tryColumFamily column family to test
- * @param tryQualifier qualifier to test
+ * @param tryQualifier qualifier to test
* @return true if values match the rule
*/
- public boolean match(TableName tryTable, byte [] tryColumFamily, byte [] tryQualifier) {
+ public boolean match(TableName tryTable, byte[] tryColumFamily, byte[] tryQualifier) {
boolean tableMatch = tableMatch(tryTable);
boolean columnFamilyMatch = columnFamilyMatch(tryColumFamily);
boolean qualfierMatch = qualifierMatch(tryQualifier);
@@ -57,7 +56,7 @@ public boolean match(TableName tryTable, byte [] tryColumFamily, byte [] tryQual
* @param tryQualifier qualifier to test
* @return true if the qualifier matches
*/
- public boolean qualifierMatch(byte [] tryQualifier) {
+ public boolean qualifierMatch(byte[] tryQualifier) {
if (qualifier != null) {
if (qualifierStartsWith && qualifierEndsWith) {
@@ -78,7 +77,7 @@ public boolean qualifierMatch(byte [] tryQualifier) {
* @param tryColumFamily column family to test
* @return true if the column family matches the rule
*/
- public boolean columnFamilyMatch(byte [] tryColumFamily) {
+ public boolean columnFamilyMatch(byte[] tryColumFamily) {
if (columnFamily != null) {
return Bytes.equals(this.columnFamily, tryColumFamily);
}
@@ -101,7 +100,7 @@ public boolean tableMatch(TableName tryTable) {
* set the column family for the rule
* @param columnFamily column family to set
*/
- public void setColumnFamily(byte [] columnFamily) {
+ public void setColumnFamily(byte[] columnFamily) {
this.columnFamily = columnFamily;
}
@@ -109,7 +108,7 @@ public void setColumnFamily(byte [] columnFamily) {
* set the qualifier value for the rule
* @param qualifier qualifier to set
*/
- public void setQualifier(byte []qualifier) {
+ public void setQualifier(byte[] qualifier) {
this.qualifier = qualifier;
if (startsWith(qualifier, ast)) {
qualifierEndsWith = true;
@@ -129,11 +128,11 @@ public void setQualifier(byte []qualifier) {
/**
* Tests if data starts with startsWith
- * @param data byte array to test
+ * @param data byte array to test
* @param startsWith array that we want to see if data starts with
* @return true if data starts with startsWith
*/
- public static boolean startsWith(byte [] data, byte [] startsWith) {
+ public static boolean startsWith(byte[] data, byte[] startsWith) {
if (startsWith.length > data.length) {
return false;
}
@@ -152,11 +151,11 @@ public static boolean startsWith(byte [] data, byte [] startsWith) {
/**
* Tests if data ends with endsWith
- * @param data byte array to test
+ * @param data byte array to test
* @param endsWith array that we want to see if data ends with
* @return true if data ends with endsWith
*/
- public static boolean endsWith(byte [] data, byte [] endsWith) {
+ public static boolean endsWith(byte[] data, byte[] endsWith) {
if (endsWith.length > data.length) {
return false;
}
@@ -168,7 +167,7 @@ public static boolean endsWith(byte [] data, byte [] endsWith) {
int endStart = data.length - endsWith.length;
for (int i = 0; i < endsWith.length; i++) {
- //if (endsWith[i]!=data[(data.length-1)-(endsWith.length+i)]){
+ // if (endsWith[i]!=data[(data.length-1)-(endsWith.length+i)]){
if (endsWith[i] != data[endStart + i]) {
return false;
}
@@ -202,13 +201,11 @@ public byte[] getColumnFamily() {
/**
* get the qualifier for the rule
- * @return qualfier
*/
public byte[] getQualifier() {
return qualifier;
}
-
/**
* indicates if the qualfier is a wildcard like *foo
* @return true if rule is like *foo
diff --git a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRoutingRules.java b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRoutingRules.java
index c8b818c6..e4e9312f 100644
--- a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRoutingRules.java
+++ b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRoutingRules.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.kafka;
-
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
@@ -25,7 +24,6 @@
import java.util.List;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
-
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
@@ -34,41 +32,20 @@
import org.w3c.dom.NodeList;
/**
- * The topic routing/drop rules.
- *
- * <rules>
- * <rule .... />
- *
- * </rules>
- *
- *
- *
- * A wildcard can only be at the beginning or at the end (can be at both sides).
- *
- * drop rules are always evaluated first.
- *
- * drop examples:
- * <rule action="drop" table="default:MyTable" />
- * Do not send replication events for table MyTable
- *
- * <rule action="drop" table="default:MyTable" columnFamily="data"/>
- * Do not send replication events for table MyTable's column family data
- *
- * <rule action="drop" table="default:MyTable" columnFamily="data" qualfier="dhold:*"/>
- * Do not send replication events for any qualiifier on table MyTable with column family data
- *
- * routeRules examples:
- *
- * <rule action="routeRules" table="default:MyTable" topic="mytopic"/>
- * routeRules all replication events for table default:Mytable to topic mytopic
- *
- * <rule action="routeRules" table="default:MyTable" columnFamily="data" topic="mytopic"/>
- * routeRules all replication events for table default:Mytable column family data to topic mytopic
- *
- * <rule action="routeRules" table="default:MyTable" columnFamily="data" topic="mytopic"
- * qualifier="hold:*"/>
- * routeRules all replication events for qualifiers that start with hold: for table
- * default:Mytable column family data to topic mytopic
+ * The topic routing/drop rules. <rules> <rule .... /> </rules> A wildcard can
+ * only be at the beginning or at the end (can be at both sides). drop rules are always evaluated
+ * first. drop examples: <rule action="drop" table="default:MyTable" /> Do not send
+ * replication events for table MyTable <rule action="drop" table="default:MyTable"
+ * columnFamily="data"/> Do not send replication events for table MyTable's column family data
+ * <rule action="drop" table="default:MyTable" columnFamily="data" qualfier="dhold:*"/> Do not
+ * send replication events for any qualiifier on table MyTable with column family data routeRules
+ * examples: <rule action="routeRules" table="default:MyTable" topic="mytopic"/> routeRules
+ * all replication events for table default:Mytable to topic mytopic <rule action="routeRules"
+ * table="default:MyTable" columnFamily="data" topic="mytopic"/> routeRules all replication
+ * events for table default:Mytable column family data to topic mytopic <rule action="routeRules"
+ * table="default:MyTable" columnFamily="data" topic="mytopic" qualifier="hold:*"/> routeRules
+ * all replication events for qualifiers that start with hold: for table default:Mytable column
+ * family data to topic mytopic
*/
@InterfaceAudience.Private
public class TopicRoutingRules {
@@ -100,7 +77,7 @@ public TopicRoutingRules(File source) throws Exception {
* @throws Exception error loading rule set
*/
public void reloadIfFile() throws Exception {
- if (this.sourceFile!=null){
+ if (this.sourceFile != null) {
List dropRulesSave = this.dropRules;
List routeRulesSave = this.routeRules;
@@ -108,15 +85,15 @@ public void reloadIfFile() throws Exception {
List dropRulesNew = new ArrayList<>();
List routeRulesNew = new ArrayList<>();
- parseRules(fin,dropRulesNew,routeRulesNew);
+ parseRules(fin, dropRulesNew, routeRulesNew);
this.dropRules = dropRulesNew;
this.routeRules = routeRulesNew;
- } catch (Exception e){
+ } catch (Exception e) {
// roll back
- this.dropRules=dropRulesSave;
- this.routeRules=routeRulesSave;
+ this.dropRules = dropRulesSave;
+ this.routeRules = routeRulesSave;
// re-throw
throw e;
}
@@ -130,18 +107,18 @@ public void reloadIfFile() throws Exception {
public void parseRules(InputStream input) {
List dropRulesNew = new ArrayList<>();
List routeRulesNew = new ArrayList<>();
- parseRules(input,dropRulesNew,routeRulesNew);
+ parseRules(input, dropRulesNew, routeRulesNew);
this.dropRules = dropRulesNew;
this.routeRules = routeRulesNew;
}
/**
* Parse the XML in the InputStream into route/drop rules and store them in the passed in Lists
- * @param input inputstream the contains the ruleset
- * @param dropRules list to accumulate drop rules
+ * @param input inputstream the contains the ruleset
+ * @param dropRules list to accumulate drop rules
* @param routeRules list to accumulate route rules
*/
- public void parseRules(InputStream input,List dropRules, List routeRules) {
+ public void parseRules(InputStream input, List dropRules, List routeRules) {
try {
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
@@ -149,7 +126,7 @@ public void parseRules(InputStream input,List dropRules, List dropRules, List dropRules, List routeRules) {
@@ -186,13 +163,13 @@ public void parseRule(Element n, List dropRules, List route
/**
* Indicates if a cell mutation should be dropped instead of routed to kafka.
- * @param table table name to check
+ * @param table table name to check
* @param columnFamily column family to check
- * @param qualifer qualifier name to check
+ * @param qualifer qualifier name to check
* @return if the mutation should be dropped instead of routed to Kafka
*/
- public boolean isExclude(final TableName table, final byte []columnFamily,
- final byte[] qualifer) {
+ public boolean isExclude(final TableName table, final byte[] columnFamily,
+ final byte[] qualifer) {
for (DropRule r : getDropRules()) {
if (r.match(table, columnFamily, qualifer)) {
return true;
@@ -203,12 +180,12 @@ public boolean isExclude(final TableName table, final byte []columnFamily,
/**
* Get topics for the table/column family/qualifier combination
- * @param table table name to check
+ * @param table table name to check
* @param columnFamily column family to check
- * @param qualifer qualifier name to check
+ * @param qualifer qualifier name to check
* @return list of topics that match the passed in values (or empty for none).
*/
- public List getTopics(TableName table, byte []columnFamily, byte []qualifer) {
+ public List getTopics(TableName table, byte[] columnFamily, byte[] qualifer) {
List ret = new ArrayList<>();
for (TopicRule r : getRouteRules()) {
if (r.match(table, columnFamily, qualifer)) {
diff --git a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRule.java b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRule.java
index 5e5b6bfd..fbb6e74c 100644
--- a/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRule.java
+++ b/kafka/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRule.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,7 +21,6 @@
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
-
import org.apache.yetus.audience.InterfaceAudience;
/**
diff --git a/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java b/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java
index e800501b..93c0cd44 100644
--- a/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java
+++ b/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java
@@ -1,16 +1,19 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.hadoop.hbase.kafka;
@@ -19,7 +22,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
-
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
diff --git a/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestDropRule.java b/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestDropRule.java
index e10bb043..1c583826 100644
--- a/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestDropRule.java
+++ b/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestDropRule.java
@@ -1,22 +1,24 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.hadoop.hbase.kafka;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
-
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -31,24 +33,19 @@
@Category(SmallTests.class)
public class TestDropRule {
private static final String DROP_RULE1 =
- "";
+ "";
private static final String DROP_RULE2 =
- "";
- private static final String DROP_RULE3 =
- "";
+ "";
+ private static final String DROP_RULE3 = "";
- private static final String DROP_RULE4 =
- "";
- private static final String DROP_RULE5 =
- "";
+ private static final String DROP_RULE4 = "";
+ private static final String DROP_RULE5 = "";
- private static final String DROP_RULE6 =
- "";
+ private static final String DROP_RULE6 = "";
@Test
public void testDropies1() {
@@ -75,7 +72,7 @@ public void testDropies2() {
Assert.assertEquals(TableName.valueOf("default:MyTable"),
rules.getDropRules().get(0).getTableName());
Assert.assertTrue(Bytes.equals("data".getBytes(StandardCharsets.UTF_8),
- rules.getDropRules().get(0).getColumnFamily()));
+ rules.getDropRules().get(0).getColumnFamily()));
Assert.assertNull(rules.getDropRules().get(0).getQualifier());
Assert.assertEquals(0, rules.getRouteRules().size());
} catch (Exception e) {
@@ -92,9 +89,9 @@ public void testDropies3() {
Assert.assertEquals(TableName.valueOf("default:MyTable"),
rules.getDropRules().get(0).getTableName());
Assert.assertTrue(Bytes.equals("data".getBytes(StandardCharsets.UTF_8),
- rules.getDropRules().get(0).getColumnFamily()));
+ rules.getDropRules().get(0).getColumnFamily()));
Assert.assertTrue(Bytes.equals("dhold".getBytes(StandardCharsets.UTF_8),
- rules.getDropRules().get(0).getQualifier()));
+ rules.getDropRules().get(0).getQualifier()));
Assert.assertEquals(0, rules.getRouteRules().size());
} catch (Exception e) {
Assert.fail(e.getMessage());
@@ -110,19 +107,18 @@ public void testDropies4() {
Assert.assertEquals(TableName.valueOf("default:MyTable"),
rules.getDropRules().get(0).getTableName());
Assert.assertTrue(Bytes.equals("data".getBytes(StandardCharsets.UTF_8),
- rules.getDropRules().get(0).getColumnFamily()));
+ rules.getDropRules().get(0).getColumnFamily()));
Assert.assertTrue(Bytes.equals("dhold:".getBytes(StandardCharsets.UTF_8),
- rules.getDropRules().get(0).getQualifier()));
+ rules.getDropRules().get(0).getQualifier()));
Assert.assertEquals(0, rules.getRouteRules().size());
DropRule drop = rules.getDropRules().get(0);
Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"),
- "data".getBytes(StandardCharsets.UTF_8), "blah".getBytes(StandardCharsets.UTF_8)));
+ "data".getBytes(StandardCharsets.UTF_8), "blah".getBytes(StandardCharsets.UTF_8)));
Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"),
- "data".getBytes(StandardCharsets.UTF_8), "dholdme".getBytes(StandardCharsets.UTF_8)));
+ "data".getBytes(StandardCharsets.UTF_8), "dholdme".getBytes(StandardCharsets.UTF_8)));
Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"),
- "data".getBytes(StandardCharsets.UTF_8),
- "dhold:me".getBytes(StandardCharsets.UTF_8)));
+ "data".getBytes(StandardCharsets.UTF_8), "dhold:me".getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) {
Assert.fail(e.getMessage());
}
@@ -137,20 +133,19 @@ public void testDropies5() {
Assert.assertEquals(TableName.valueOf("default:MyTable"),
rules.getDropRules().get(0).getTableName());
Assert.assertTrue(Bytes.equals("data".getBytes(StandardCharsets.UTF_8),
- rules.getDropRules().get(0).getColumnFamily()));
+ rules.getDropRules().get(0).getColumnFamily()));
Assert.assertTrue(Bytes.equals("pickme".getBytes(StandardCharsets.UTF_8),
- rules.getDropRules().get(0).getQualifier()));
+ rules.getDropRules().get(0).getQualifier()));
Assert.assertEquals(0, rules.getRouteRules().size());
DropRule drop = rules.getDropRules().get(0);
Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"),
- "data".getBytes(StandardCharsets.UTF_8), "blah".getBytes(StandardCharsets.UTF_8)));
+ "data".getBytes(StandardCharsets.UTF_8), "blah".getBytes(StandardCharsets.UTF_8)));
Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"),
- "data".getBytes(StandardCharsets.UTF_8),
- "blacickme".getBytes(StandardCharsets.UTF_8)));
- Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"),
- "data".getBytes(StandardCharsets.UTF_8),
- "hithere.pickme".getBytes(StandardCharsets.UTF_8)));
+ "data".getBytes(StandardCharsets.UTF_8), "blacickme".getBytes(StandardCharsets.UTF_8)));
+ Assert.assertTrue(
+ drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(StandardCharsets.UTF_8),
+ "hithere.pickme".getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) {
Assert.fail(e.getMessage());
}
@@ -163,31 +158,30 @@ public void testDropies6() {
rules.parseRules(new ByteArrayInputStream(DROP_RULE6.getBytes(StandardCharsets.UTF_8)));
Assert.assertEquals(1, rules.getDropRules().size());
Assert.assertEquals(TableName.valueOf("default:MyTable"),
- rules.getDropRules().get(0).getTableName());
+ rules.getDropRules().get(0).getTableName());
Assert.assertTrue(Bytes.equals("data".getBytes(StandardCharsets.UTF_8),
- rules.getDropRules().get(0).getColumnFamily()));
+ rules.getDropRules().get(0).getColumnFamily()));
Assert.assertTrue(Bytes.equals("pickme".getBytes(StandardCharsets.UTF_8),
- rules.getDropRules().get(0).getQualifier()));
+ rules.getDropRules().get(0).getQualifier()));
Assert.assertEquals(0, rules.getRouteRules().size());
DropRule drop = rules.getDropRules().get(0);
Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"),
- "data".getBytes(StandardCharsets.UTF_8), "blah".getBytes(StandardCharsets.UTF_8)));
+ "data".getBytes(StandardCharsets.UTF_8), "blah".getBytes(StandardCharsets.UTF_8)));
Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"),
- "data".getBytes(StandardCharsets.UTF_8),
- "blacickme".getBytes(StandardCharsets.UTF_8)));
- Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"),
- "data".getBytes(StandardCharsets.UTF_8),
- "hithere.pickme".getBytes(StandardCharsets.UTF_8)));
- Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"),
- "data".getBytes(StandardCharsets.UTF_8),
- "pickme.pleaze.do.it".getBytes(StandardCharsets.UTF_8)));
- Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"),
- "data".getBytes(StandardCharsets.UTF_8),
- "please.pickme.pleaze".getBytes(StandardCharsets.UTF_8)));
- Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"),
- "data".getBytes(StandardCharsets.UTF_8),
- "pickme.pleaze.pickme".getBytes(StandardCharsets.UTF_8)));
+ "data".getBytes(StandardCharsets.UTF_8), "blacickme".getBytes(StandardCharsets.UTF_8)));
+ Assert.assertTrue(
+ drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(StandardCharsets.UTF_8),
+ "hithere.pickme".getBytes(StandardCharsets.UTF_8)));
+ Assert.assertTrue(
+ drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(StandardCharsets.UTF_8),
+ "pickme.pleaze.do.it".getBytes(StandardCharsets.UTF_8)));
+ Assert.assertFalse(
+ drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(StandardCharsets.UTF_8),
+ "please.pickme.pleaze".getBytes(StandardCharsets.UTF_8)));
+ Assert.assertTrue(
+ drop.match(TableName.valueOf("default:MyTable"), "data".getBytes(StandardCharsets.UTF_8),
+ "pickme.pleaze.pickme".getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) {
Assert.fail(e.getMessage());
}
diff --git a/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestProcessMutations.java b/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestProcessMutations.java
index 534d4a63..177027e2 100644
--- a/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestProcessMutations.java
+++ b/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestProcessMutations.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,7 +21,6 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
@@ -39,14 +38,13 @@
@Category(SmallTests.class)
public class TestProcessMutations {
private static final String ROUTE_RULE1 =
- "";
+ "";
ProducerForTesting myTestingProducer;
@Before
public void setup() {
- this.myTestingProducer=new ProducerForTesting();
+ this.myTestingProducer = new ProducerForTesting();
}
@Test
@@ -55,22 +53,22 @@ public void testSendMessage() {
try {
rules.parseRules(new ByteArrayInputStream(ROUTE_RULE1.getBytes(StandardCharsets.UTF_8)));
Configuration conf = new Configuration();
- KafkaBridgeConnection connection = new KafkaBridgeConnection(conf,rules,myTestingProducer);
+ KafkaBridgeConnection connection = new KafkaBridgeConnection(conf, rules, myTestingProducer);
long zeTimestamp = System.currentTimeMillis();
- Put put = new Put("key1".getBytes(StandardCharsets.UTF_8),zeTimestamp);
+ Put put = new Put("key1".getBytes(StandardCharsets.UTF_8), zeTimestamp);
put.addColumn("FAMILY".getBytes(StandardCharsets.UTF_8),
- "not foo".getBytes(StandardCharsets.UTF_8),
- "VALUE should NOT pass".getBytes(StandardCharsets.UTF_8));
+ "not foo".getBytes(StandardCharsets.UTF_8),
+ "VALUE should NOT pass".getBytes(StandardCharsets.UTF_8));
put.addColumn("FAMILY".getBytes(StandardCharsets.UTF_8),
- "foo".getBytes(StandardCharsets.UTF_8),
- "VALUE should pass".getBytes(StandardCharsets.UTF_8));
+ "foo".getBytes(StandardCharsets.UTF_8),
+ "VALUE should pass".getBytes(StandardCharsets.UTF_8));
Table myTable = connection.getTable(TableName.valueOf("MyNamespace:MyTable"));
List rows = new ArrayList<>();
rows.add(put);
- myTable.batch(rows,new Object[0]);
+ myTable.batch(rows, new Object[0]);
Assert.assertFalse(myTestingProducer.getMessages().isEmpty());
- } catch (Exception e){
+ } catch (Exception e) {
Assert.fail(e.getMessage());
}
}
diff --git a/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestQualifierMatching.java b/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestQualifierMatching.java
index c965f123..229a43fc 100644
--- a/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestQualifierMatching.java
+++ b/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestQualifierMatching.java
@@ -1,21 +1,23 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.hadoop.hbase.kafka;
import java.nio.charset.StandardCharsets;
-
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
import org.junit.Test;
diff --git a/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestRouteRules.java b/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestRouteRules.java
index 0c2f184d..6705f3ec 100644
--- a/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestRouteRules.java
+++ b/kafka/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestRouteRules.java
@@ -1,26 +1,27 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.hadoop.hbase.kafka;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
-
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
-
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -33,24 +34,23 @@ public class TestRouteRules {
private static final String TEST_TABLE = "default:MyTable";
private static final String ROUTE_RULE1 =
- "";
+ "";
private static final String ROUTE_RULE2 =
- "";
private static final String ROUTE_RULE3 =
- "";
private static final String ROUTE_RULE4 =
- "";
private static final String ROUTE_RULE5 =
- "";
private static final String ROUTE_RULE6 =
- "";
@Test
@@ -77,9 +77,8 @@ public void testTopic2() {
Assert.assertEquals(1, rules.getRouteRules().size());
Assert.assertEquals(TableName.valueOf(TEST_TABLE),
rules.getRouteRules().get(0).getTableName());
- Assert.assertTrue(
- Bytes.equals("data".getBytes(StandardCharsets.UTF_8),
- rules.getRouteRules().get(0).getColumnFamily()));
+ Assert.assertTrue(Bytes.equals("data".getBytes(StandardCharsets.UTF_8),
+ rules.getRouteRules().get(0).getColumnFamily()));
Assert.assertNull(rules.getRouteRules().get(0).getQualifier());
Assert.assertEquals(0, rules.getDropRules().size());
} catch (Exception e) {
@@ -96,9 +95,9 @@ public void testTopic3() {
Assert.assertEquals(TableName.valueOf(TEST_TABLE),
rules.getRouteRules().get(0).getTableName());
Assert.assertTrue(Bytes.equals("data".getBytes(StandardCharsets.UTF_8),
- rules.getRouteRules().get(0).getColumnFamily()));
+ rules.getRouteRules().get(0).getColumnFamily()));
Assert.assertTrue(Bytes.equals("dhold".getBytes(StandardCharsets.UTF_8),
- rules.getRouteRules().get(0).getQualifier()));
+ rules.getRouteRules().get(0).getQualifier()));
Assert.assertTrue(rules.getRouteRules().get(0).getTopics().contains("foo"));
Assert.assertEquals(1, rules.getRouteRules().get(0).getTopics().size());
@@ -117,23 +116,18 @@ public void testTopic4() {
Assert.assertEquals(TableName.valueOf(TEST_TABLE),
rules.getRouteRules().get(0).getTableName());
Assert.assertTrue(Bytes.equals("data".getBytes(StandardCharsets.UTF_8),
- rules.getRouteRules().get(0).getColumnFamily()));
+ rules.getRouteRules().get(0).getColumnFamily()));
Assert.assertTrue(Bytes.equals("dhold:".getBytes(StandardCharsets.UTF_8),
- rules.getRouteRules().get(0).getQualifier()));
+ rules.getRouteRules().get(0).getQualifier()));
Assert.assertEquals(0, rules.getDropRules().size());
TopicRule route = rules.getRouteRules().get(0);
- Assert.assertFalse(
- route.match(TableName.valueOf(TEST_TABLE),
- "data".getBytes(StandardCharsets.UTF_8),
- "blah".getBytes(StandardCharsets.UTF_8)));
- Assert.assertFalse(
- route.match(TableName.valueOf(TEST_TABLE),
- "data".getBytes(StandardCharsets.UTF_8),
- "dholdme".getBytes(StandardCharsets.UTF_8)));
+ Assert.assertFalse(route.match(TableName.valueOf(TEST_TABLE),
+ "data".getBytes(StandardCharsets.UTF_8), "blah".getBytes(StandardCharsets.UTF_8)));
+ Assert.assertFalse(route.match(TableName.valueOf(TEST_TABLE),
+ "data".getBytes(StandardCharsets.UTF_8), "dholdme".getBytes(StandardCharsets.UTF_8)));
Assert.assertTrue(route.match(TableName.valueOf(TEST_TABLE),
- "data".getBytes(StandardCharsets.UTF_8),
- "dhold:me".getBytes(StandardCharsets.UTF_8)));
+ "data".getBytes(StandardCharsets.UTF_8), "dhold:me".getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) {
Assert.fail(e.getMessage());
@@ -149,22 +143,19 @@ public void testTopic5() {
Assert.assertEquals(TableName.valueOf(TEST_TABLE),
rules.getRouteRules().get(0).getTableName());
Assert.assertTrue(Bytes.equals("data".getBytes(StandardCharsets.UTF_8),
- rules.getRouteRules().get(0).getColumnFamily()));
+ rules.getRouteRules().get(0).getColumnFamily()));
Assert.assertTrue(Bytes.equals("pickme".getBytes(StandardCharsets.UTF_8),
- rules.getRouteRules().get(0).getQualifier()));
+ rules.getRouteRules().get(0).getQualifier()));
Assert.assertEquals(0, rules.getDropRules().size());
TopicRule route = rules.getRouteRules().get(0);
- Assert.assertFalse(
- route.match(TableName.valueOf(TEST_TABLE),
- "data".getBytes(StandardCharsets.UTF_8),
- "blah".getBytes(StandardCharsets.UTF_8)));
Assert.assertFalse(route.match(TableName.valueOf(TEST_TABLE),
- "data".getBytes(StandardCharsets.UTF_8),
- "blacickme".getBytes(StandardCharsets.UTF_8)));
- Assert.assertTrue(route.match(TableName.valueOf(TEST_TABLE),
- "data".getBytes(StandardCharsets.UTF_8),
- "hithere.pickme".getBytes(StandardCharsets.UTF_8)));
+ "data".getBytes(StandardCharsets.UTF_8), "blah".getBytes(StandardCharsets.UTF_8)));
+ Assert.assertFalse(route.match(TableName.valueOf(TEST_TABLE),
+ "data".getBytes(StandardCharsets.UTF_8), "blacickme".getBytes(StandardCharsets.UTF_8)));
+ Assert.assertTrue(
+ route.match(TableName.valueOf(TEST_TABLE), "data".getBytes(StandardCharsets.UTF_8),
+ "hithere.pickme".getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) {
Assert.fail(e.getMessage());
@@ -180,31 +171,28 @@ public void testTopic6() {
Assert.assertEquals(TableName.valueOf(TEST_TABLE),
rules.getRouteRules().get(0).getTableName());
Assert.assertTrue(Bytes.equals("data".getBytes(StandardCharsets.UTF_8),
- rules.getRouteRules().get(0).getColumnFamily()));
+ rules.getRouteRules().get(0).getColumnFamily()));
Assert.assertTrue(Bytes.equals("pickme".getBytes(StandardCharsets.UTF_8),
- rules.getRouteRules().get(0).getQualifier()));
+ rules.getRouteRules().get(0).getQualifier()));
Assert.assertEquals(0, rules.getDropRules().size());
TopicRule route = rules.getRouteRules().get(0);
- Assert.assertFalse(
- route.match(TableName.valueOf(TEST_TABLE),
- "data".getBytes(StandardCharsets.UTF_8),
- "blah".getBytes(StandardCharsets.UTF_8)));
Assert.assertFalse(route.match(TableName.valueOf(TEST_TABLE),
- "data".getBytes(StandardCharsets.UTF_8),
- "blacickme".getBytes(StandardCharsets.UTF_8)));
- Assert.assertTrue(route.match(TableName.valueOf(TEST_TABLE),
- "data".getBytes(StandardCharsets.UTF_8),
- "hithere.pickme".getBytes(StandardCharsets.UTF_8)));
- Assert.assertTrue(route.match(TableName.valueOf(TEST_TABLE),
- "data".getBytes(StandardCharsets.UTF_8),
- "pickme.pleaze.do.it".getBytes(StandardCharsets.UTF_8)));
+ "data".getBytes(StandardCharsets.UTF_8), "blah".getBytes(StandardCharsets.UTF_8)));
Assert.assertFalse(route.match(TableName.valueOf(TEST_TABLE),
- "data".getBytes(StandardCharsets.UTF_8),
- "please.pickme.pleaze".getBytes(StandardCharsets.UTF_8)));
- Assert.assertTrue(route.match(TableName.valueOf(TEST_TABLE),
- "data".getBytes(StandardCharsets.UTF_8),
- "pickme.pleaze.pickme".getBytes(StandardCharsets.UTF_8)));
+ "data".getBytes(StandardCharsets.UTF_8), "blacickme".getBytes(StandardCharsets.UTF_8)));
+ Assert.assertTrue(
+ route.match(TableName.valueOf(TEST_TABLE), "data".getBytes(StandardCharsets.UTF_8),
+ "hithere.pickme".getBytes(StandardCharsets.UTF_8)));
+ Assert.assertTrue(
+ route.match(TableName.valueOf(TEST_TABLE), "data".getBytes(StandardCharsets.UTF_8),
+ "pickme.pleaze.do.it".getBytes(StandardCharsets.UTF_8)));
+ Assert.assertFalse(
+ route.match(TableName.valueOf(TEST_TABLE), "data".getBytes(StandardCharsets.UTF_8),
+ "please.pickme.pleaze".getBytes(StandardCharsets.UTF_8)));
+ Assert.assertTrue(
+ route.match(TableName.valueOf(TEST_TABLE), "data".getBytes(StandardCharsets.UTF_8),
+ "pickme.pleaze.pickme".getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) {
Assert.fail(e.getMessage());
diff --git a/kafka/pom.xml b/kafka/pom.xml
index a829e06c..50b9e051 100644
--- a/kafka/pom.xml
+++ b/kafka/pom.xml
@@ -1,6 +1,5 @@
-
-
+
+
${revision}
- Apache HBase Connectors
pom
-
- Connectors to Apache HBase.
-
+ Apache HBase Connectors
+ Connectors to Apache HBase.
http://hbase.apache.org
2018
@@ -58,35 +56,7 @@
repo
-
- kafka
- spark
- hbase-connectors-assembly
-
-
- scm:git:git://gitbox.apache.org/repos/asf/hbase-connectors.git
- scm:git:https://gitbox.apache.org/repos/asf/hbase-connectors.git
- https://gitbox.apache.org/repos/asf?p=hbase-connectors.git;a=summary
-
-
-
-
- releases
- Internal releases
- https://nexus.int.sproutsocial.com/nexus/content/repositories/releases/
-
-
- snapshots
- Internal snapshots
- https://nexus.int.sproutsocial.com/nexus/content/repositories/snapshots/
-
-
-
-
-
- JIRA
- http://issues.apache.org/jira/browse/HBASE
-
+
+ JIRA
+ http://issues.apache.org/jira/browse/HBASE
+
+
+
+ releases
+ Internal releases
+ https://nexus.int.sproutsocial.com/nexus/content/repositories/releases/
+
+
+ snapshots
+ Internal snapshots
+ https://nexus.int.sproutsocial.com/nexus/content/repositories/snapshots/
+
+
- 1.0.1-SNAPSHOT
+ 1.1.0-SNAPSHOT
true
yyyy-MM-dd'T'HH:mm
${maven.build.timestamp}
1.8
${compileSource}
+
+ false
+ false
+ false
+ false
+ false
+ 1.4
+
3.5.0
- 2.4.9
+ 2.5.4
1.6.0
0.5.0
- 4.12
- 4.0.1
- 2.8.5
- 3.2.0
+ 4.13.2
+ 1.8.5
+ 4.1.4
+ 2.10.0
+ 3.2.4
${hadoop-three.version}
- 1.7.25
- 1.2.17
- 8.28
- 3.1.2
- 3.0.0-M5
+ 1.7.36
+ 2.17.2
+ 1.2.25
+ 8.45.1
+ 2.27.2
+ 3.2.1
+ 2.5.0
+ 3.0.0
3.0.0
- 1.2
+ 1.5.1
0.14.0
2.5.0
0.5.0
2.11.0
1.7.7
- 3.6
+ 3.12.0
3.10.6.Final
@@ -172,6 +181,10 @@
3.0.1-b08
hbase-hadoop2-compat
false
+ 3.9.1.2184
+ 1.4.11
+ 1.0.0
+ 0.8.8
@@ -195,21 +208,42 @@
org.apache.avro
avro
${avro.version}
+
+
+ com.thoughtworks.paranamer
+ paranamer
+
+
org.slf4j
- slf4j-log4j12
+ slf4j-api
${slf4j.version}
- org.slf4j
- slf4j-api
- ${slf4j.version}
+ org.apache.logging.log4j
+ log4j-api
+ ${log4j2.version}
+
+
+ org.apache.logging.log4j
+ log4j-core
+ ${log4j2.version}
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ ${log4j2.version}
+
+
+ org.apache.logging.log4j
+ log4j-1.2-api
+ ${log4j2.version}
- log4j
- log4j
- ${log4j.version}
+ ch.qos.reload4j
+ reload4j
+ ${reload4j.version}
org.glassfish
@@ -233,6 +267,12 @@
junit
${junit.version}
+
+ org.mockito
+ mockito-all
+ ${mockito-all.version}
+ test
+
org.apache.hbase.thirdparty
hbase-shaded-miscellaneous
@@ -314,8 +354,8 @@
${hbase.version}
- hbase-it
org.apache.hbase
+ hbase-it
${hbase.version}
test-jar
test
@@ -329,13 +369,6 @@
-
-
- kr.motd.maven
- os-maven-plugin
- ${os.maven.version}
-
-
@@ -405,19 +438,19 @@
org.codehaus.mojo
buildnumber-maven-plugin
- 1.4
+ ${buildnumber.maven.version}
+
+ yyyy
+ build.year
+
- validate
create-timestamp
+ validate
-
- yyyy
- build.year
-
[${java.min.version},)
Java is out of date.
HBase requires at least version ${java.min.version} of the JDK to properly build from source.
- See the reference guide on building for more information: http://hbase.apache.org/book.html#build
-
+ See the reference guide on building for more information: http://hbase.apache.org/book.html#build
@@ -543,10 +574,10 @@