diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/DragonJob.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/DragonJob.java
index be8ae6a..4d4eacb 100644
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/DragonJob.java
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/DragonJob.java
@@ -29,6 +29,8 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.realtime.conf.DragonConfiguration;
+import org.apache.hadoop.realtime.mr.Mapper;
+import org.apache.hadoop.realtime.mr.Reducer;
import org.apache.hadoop.realtime.records.CounterGroup;
import org.apache.hadoop.realtime.records.Counters;
import org.apache.hadoop.realtime.records.JobId;
@@ -322,11 +324,25 @@ public JobState getState() {
return null;
}
- public void setMapper(Class> clazz){
- conf.setClass(DragonJobConfig.JOB_MAP_CLASS, clazz, Object.class);
+ /**
+ * Set the {@link Mapper} for the job.
+ * @param cls the Mapper
to use
+ * @throws IllegalStateException if the job is submitted
+ */
+ public void setMapperClass(Class extends Mapper> clazz)
+ throws IllegalStateException {
+ ensureState(JobState.NEW);
+ conf.setClass(DragonJobConfig.JOB_MAP_CLASS, clazz, Mapper.class);
}
-
- public void setReducer(Class> clazz){
- conf.setClass(DragonJobConfig.JOB_REDUCE_CLASS, clazz, Object.class);
+
+ /**
+ * Set the {@link Reducer} for the job.
+ * @param cls the Reducer
to use
+ * @throws IllegalStateException if the job is submitted
+ */
+ public void setReducerClass(Class extends Reducer> clazz)
+ throws IllegalStateException {
+ ensureState(JobState.NEW);
+ conf.setClass(DragonJobConfig.JOB_REDUCE_CLASS, clazz, Reducer.class);
}
}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/DragonJobConfig.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/DragonJobConfig.java
index 0860ab1..94e416f 100644
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/DragonJobConfig.java
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/DragonJobConfig.java
@@ -292,11 +292,13 @@ public class DragonJobConfig {
public static final int DEFAULT_JOB_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
50;
- public static final String JOB_MAP_CLASS = "map.class";
+ public static final String JOB_MAP_CLASS = "dragon.map.class";
- public static final String JOB_REDUCE_CLASS = "reduce.class";
+ public static final String JOB_REDUCE_CLASS = "dragon.reduce.class";
- public static final String MAP_PARALLELISM = "map.parallelism";
+ public static final String JOB_OUTPUT_DIR = "dragon.output.dir";
+
+ public static final String MAP_PARALLELISM = "dragon.map.tasks";
- public static final String REDUCE_PARALLELISM = "reduce.parallelism";
+ public static final String REDUCE_PARALLELISM = "dragon.reduce.tasks";
}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/ChildExecutor.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/ChildExecutor.java
index e2f599b..be91059 100644
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/ChildExecutor.java
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/ChildExecutor.java
@@ -17,17 +17,219 @@
*/
package org.apache.hadoop.realtime.child;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.realtime.DragonJobConfig;
+import org.apache.hadoop.realtime.conf.DragonConfiguration;
+import org.apache.hadoop.realtime.event.Event;
+import org.apache.hadoop.realtime.event.EventEmitter;
+import org.apache.hadoop.realtime.event.EventProducer;
+import org.apache.hadoop.realtime.mr.MapContext;
+import org.apache.hadoop.realtime.mr.Mapper;
import org.apache.hadoop.realtime.protocol.DragonChildProtocol;
import org.apache.hadoop.realtime.records.ChildExecutionContext;
+import org.apache.hadoop.realtime.records.Counters;
+import org.apache.hadoop.realtime.records.TaskType;
+import org.apache.hadoop.util.ReflectionUtils;
/**
*/
-public class ChildExecutor {
+abstract class ChildExecutor {
+ // @InterfaceAudience.Private
+ // @InterfaceStability.Unstable
+ // protected class TaskReporter implements StatusReporter, Runnable {
+ // private DragonChildProtocol umbilical;;
+ // private DragonConfiguration conf;
+ // private Progress taskProgress;
+ // private Thread pingThread = null;
+ // private boolean done = true;
+ // private Object lock = new Object();
+ //
+ // TaskReporter(Progress taskProgress, DragonChildProtocol umbilical) {
+ // this.umbilical = umbilical;
+ // this.taskProgress = taskProgress;
+ // }
+ //
+ //
+ // public void setStatus(String status) {
+ // taskProgress.setStatus(normalizeStatus(status, conf));
+ // }
+ //
+ // public Counters.Counter getCounter(String group, String name) {
+ // Counters.Counter counter = null;
+ // if (counters != null) {
+ // counter = counters.findCounter(group, name);
+ // }
+ // return counter;
+ // }
+ //
+ // public Counters.Counter getCounter(Enum> name) {
+ // return counters == null ? null : counters.findCounter(name);
+ // }
+ //
+ // public void incrCounter(Enum key, long amount) {
+ // if (counters != null) {
+ // counters.incrCounter(key, amount);
+ // }
+ // }
+ //
+ // public void incrCounter(String group, String counter, long amount) {
+ // if (counters != null) {
+ // counters.incrCounter(group, counter, amount);
+ // }
+ // if (skipping
+ // && SkipBadRecords.COUNTER_GROUP.equals(group)
+ // && (SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS.equals(counter) ||
+ // SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS
+ // .equals(counter))) {
+ // // if application reports the processed records, move the
+ // // currentRecStartIndex to the next.
+ // // currentRecStartIndex is the start index which has not yet been
+ // // finished and is still in task's stomach.
+ // for (int i = 0; i < amount; i++) {
+ // currentRecStartIndex = currentRecIndexIterator.next();
+ // }
+ // }
+ // setProgressFlag();
+ // }
+ //
+ // /**
+ // * The communication thread handles communication with the parent (Task
+ // * Tracker). It sends progress updates if progress has been made or if the
+ // * task needs to let the parent know that it's alive. It also pings the
+ // * parent to see if it's alive.
+ // */
+ // public void run() {
+ // final int MAX_RETRIES = 3;
+ // int remainingRetries = MAX_RETRIES;
+ // // get current flag value and reset it as well
+ // boolean sendProgress = resetProgressFlag();
+ // while (!taskDone.get()) {
+ // synchronized (lock) {
+ // done = false;
+ // }
+ // try {
+ // boolean taskFound = true; // whether TT knows about this task
+ // // sleep for a bit
+ // synchronized (lock) {
+ // if (taskDone.get()) {
+ // break;
+ // }
+ // lock.wait(PROGRESS_INTERVAL);
+ // }
+ // if (taskDone.get()) {
+ // break;
+ // }
+ //
+ // if (sendProgress) {
+ // // we need to send progress update
+ // updateCounters();
+ // taskStatus.statusUpdate(taskProgress.get(),
+ // taskProgress.toString(), counters);
+ // taskFound = umbilical.statusUpdate(taskId, taskStatus);
+ // taskStatus.clearStatus();
+ // } else {
+ // // send ping
+ // taskFound = umbilical.ping(taskId);
+ // }
+ //
+ // // if Task Tracker is not aware of our task ID (probably because it
+ // // died and
+ // // came back up), kill ourselves
+ // if (!taskFound) {
+ // LOG.warn("Parent died. Exiting " + taskId);
+ // resetDoneFlag();
+ // System.exit(66);
+ // }
+ //
+ // sendProgress = resetProgressFlag();
+ // remainingRetries = MAX_RETRIES;
+ // } catch (Throwable t) {
+ // LOG.info("Communication exception: "
+ // + StringUtils.stringifyException(t));
+ // remainingRetries -= 1;
+ // if (remainingRetries == 0) {
+ // ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
+ // LOG.warn("Last retry, killing " + taskId);
+ // resetDoneFlag();
+ // System.exit(65);
+ // }
+ // }
+ // }
+ // // Notify that we are done with the work
+ // resetDoneFlag();
+ // }
+ //
+ // void resetDoneFlag() {
+ // synchronized (lock) {
+ // done = true;
+ // lock.notify();
+ // }
+ // }
+ //
+ // public void startCommunicationThread() {
+ // if (pingThread == null) {
+ // pingThread = new Thread(this, "communication thread");
+ // pingThread.setDaemon(true);
+ // pingThread.start();
+ // }
+ // }
+ //
+ // public void stopCommunicationThread() throws InterruptedException {
+ // if (pingThread != null) {
+ // // Intent of the lock is to not send an interupt in the middle of an
+ // // umbilical.ping or umbilical.statusUpdate
+ // synchronized (lock) {
+ // // Interrupt if sleeping. Otherwise wait for the RPC call to return.
+ // lock.notify();
+ // }
+ //
+ // synchronized (lock) {
+ // while (!done) {
+ // lock.wait();
+ // }
+ // }
+ // pingThread.interrupt();
+ // pingThread.join();
+ // }
+ // }
+ // }
- public static void run(Configuration conf, DragonChildProtocol proxy,
- ChildExecutionContext context) {
+ /**
+ * Gets a handle to the Statistics instance based on the scheme associated
+ * with path.
+ *
+ * @param path the path.
+ * @param conf the configuration to extract the scheme from if not part of the
+ * path.
+ * @return a Statistics instance, or null if none is found for the scheme.
+ */
+ protected static List getFsStatistics(Path path,
+ Configuration conf) throws IOException {
+ List matchedStats = new ArrayList();
+ path = path.getFileSystem(conf).makeQualified(path);
+ String scheme = path.toUri().getScheme();
+ for (Statistics stats : FileSystem.getAllStatistics()) {
+ if (stats.getScheme().equals(scheme)) {
+ matchedStats.add(stats);
+ }
+ }
+ return matchedStats;
+ }
-
+ protected void execute(
+ final Configuration conf, final DragonChildProtocol proxy,
+ final ChildExecutionContext context) throws IOException,
+ InterruptedException {
+ // make a config helper so we can get the classes
}
+
+
}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventSource.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/ChildExecutorFactory.java
similarity index 67%
rename from hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventSource.java
rename to hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/ChildExecutorFactory.java
index ed973f6..b3d4a80 100644
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventSource.java
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/ChildExecutorFactory.java
@@ -15,11 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.realtime.event;
+package org.apache.hadoop.realtime.child;
+
+import org.apache.hadoop.realtime.records.TaskType;
/**
- * Receives and persists the terminal {@link Event}
*/
-public interface EventSource {
- public void nextEvent();
+public final class ChildExecutorFactory {
+ public static ChildExecutor newExecutor(TaskType type) {
+ if (type == TaskType.MAP)
+ return new MapChildExecutor();
+ else if (type == TaskType.MAP)
+ return new ReduceChildExecutor();
+ else
+ return null; // or throw an exception?
+ }
}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/Context.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/Context.java
new file mode 100644
index 0000000..e7716b6
--- /dev/null
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/Context.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.realtime.child;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.realtime.conf.DragonConfiguration;
+import org.apache.hadoop.realtime.event.Event;
+import org.apache.hadoop.realtime.mr.Mapper;
+import org.apache.hadoop.realtime.mr.Reducer;
+import org.apache.hadoop.realtime.records.TaskAttemptId;
+import org.apache.hadoop.realtime.records.TaskType;
+
+/**
+ * A context object that allows input and output from the task. It is only
+ * supplied to the {@link Mapper} or {@link Reducer}.
+ * @param the input key type for the task
+ * @param the input value type for the task
+ * @param the output key type for the task
+ * @param the output value type for the task
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface Context {
+
+ DragonConfiguration getConfiguration();
+
+ TaskAttemptId getTaskAttemptId();
+
+ /**
+ * return the partition number of this task
+ * @return
+ */
+ int getPartition();
+
+ String getUser();
+
+
+ TaskType getTaskType();
+
+ /**
+ * take a event from the queue.
+ *
+ * @return the current key object or null if there isn't one
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public Event pollEvent() throws IOException,
+ InterruptedException;
+
+ /**
+ * take a event from event producer
+ *
+ * @return the current key object or null if there isn't one
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public Event pollEvent(long timeout, TimeUnit unit)
+ throws IOException, InterruptedException;
+
+ /**
+ * Generate an output event
+ */
+ public boolean emitEvent(Event event) throws IOException,
+ InterruptedException;
+
+ /**
+ * Generate an output event
+ */
+ public boolean emitEvent(Event event, long timeout,
+ TimeUnit unit) throws IOException, InterruptedException;
+
+}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/DragonChild.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/DragonChild.java
index 5b70657..c2f837e 100644
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/DragonChild.java
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/DragonChild.java
@@ -39,12 +39,12 @@
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.realtime.DragonJobConfig;
+import org.apache.hadoop.realtime.conf.DragonConfiguration;
import org.apache.hadoop.realtime.protocol.DragonChildProtocol;
import org.apache.hadoop.realtime.protocol.records.GetTaskRequest;
import org.apache.hadoop.realtime.protocol.records.GetTaskResponse;
import org.apache.hadoop.realtime.records.ChildExecutionContext;
import org.apache.hadoop.realtime.records.TaskAttemptId;
-import org.apache.hadoop.realtime.records.TaskId;
import org.apache.hadoop.realtime.records.TaskReport;
import org.apache.hadoop.realtime.security.TokenCache;
import org.apache.hadoop.realtime.security.token.JobTokenIdentifier;
@@ -75,7 +75,7 @@ class DragonChild {
public static void main(String[] args) throws Throwable {
LOG.debug("Child starting");
- final Configuration defaultConf = new Configuration();
+ final DragonConfiguration defaultConf = new DragonConfiguration();
defaultConf.addResource(DragonJobConfig.JOB_CONF_FILE);
UserGroupInformation.setConfiguration(defaultConf);
@@ -150,7 +150,9 @@ public Object run() throws Exception {
defaultConf.set(DragonJobConfig.WORKING_DIR, workDir);
}
Path workPath = new Path(workDir);
- ChildExecutor.run(defaultConf, amProxy, context); // run the task
+ ChildExecutor executor = ChildExecutorFactory.newExecutor(
+ context.getTaskType());
+ executor.execute(defaultConf, amProxy, context); // run the task
return null;
}
});
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/MapChildExecutor.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/MapChildExecutor.java
new file mode 100644
index 0000000..90d9383
--- /dev/null
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/MapChildExecutor.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.realtime.child;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.realtime.DragonJobConfig;
+import org.apache.hadoop.realtime.conf.DragonConfiguration;
+import org.apache.hadoop.realtime.event.Event;
+import org.apache.hadoop.realtime.event.EventEmitter;
+import org.apache.hadoop.realtime.event.EventProducer;
+import org.apache.hadoop.realtime.mr.MapContext;
+import org.apache.hadoop.realtime.mr.Mapper;
+import org.apache.hadoop.realtime.protocol.DragonChildProtocol;
+import org.apache.hadoop.realtime.records.ChildExecutionContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ */
+final class MapChildExecutor extends ChildExecutor {
+
+ private static class DirectOutputCollector implements
+ EventEmitter {
+ private final EventEmitter out;
+ private final List fsStats;
+ // counters
+
+ @SuppressWarnings("unchecked")
+ DirectOutputCollector(DragonConfiguration conf) throws IOException,
+ ClassNotFoundException, InterruptedException {
+ this.out = null;
+ String name = conf.get(DragonJobConfig.JOB_OUTPUT_DIR);
+ this.fsStats =
+ getFsStatistics(name == null ? null : new Path(name), conf);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (out != null) {
+ out.close();
+ }
+ }
+
+ private long getOutputBytes(List stats) {
+ if (stats == null)
+ return 0;
+ long bytesWritten = 0;
+ for (Statistics stat : stats) {
+ bytesWritten = bytesWritten + stat.getBytesWritten();
+ }
+ return bytesWritten;
+ }
+
+ @Override
+ public boolean emitEvent(Event event) throws IOException,
+ InterruptedException {
+ return out.emitEvent(event);
+ }
+
+ @Override
+ public boolean emitEvent(Event event, long timeout, TimeUnit unit)
+ throws IOException, InterruptedException {
+ return out.emitEvent(event, timeout, unit);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected void execute(
+ final Configuration conf, final DragonChildProtocol proxy,
+ final ChildExecutionContext context) throws IOException,
+ InterruptedException {
+ super.execute(conf, proxy, context);
+
+ // make a mapper
+ Mapper mapper =
+ (Mapper) ReflectionUtils.newInstance(
+ conf.getClass(DragonJobConfig.JOB_MAP_CLASS, Mapper.class), conf);
+ // fetch the input sub-dirs
+
+ // make the event producer
+ EventProducer input = null;
+
+ // if reduce number = 0 , then output = DirectOutputCollector
+ // else output = OutputCollector
+
+ // make a map context
+ MapContext mapContext = null;
+
+ // initialize the event producer
+ // run the mapper
+ mapper.run(mapContext);
+ // close the event producer
+
+ }
+
+}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/ReduceChildExecutor.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/ReduceChildExecutor.java
new file mode 100644
index 0000000..e018f86
--- /dev/null
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/ReduceChildExecutor.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.realtime.child;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.realtime.DragonJobConfig;
+import org.apache.hadoop.realtime.event.EventProducer;
+import org.apache.hadoop.realtime.mr.MapContext;
+import org.apache.hadoop.realtime.mr.Mapper;
+import org.apache.hadoop.realtime.protocol.DragonChildProtocol;
+import org.apache.hadoop.realtime.records.ChildExecutionContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ */
+public class ReduceChildExecutor extends ChildExecutor {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected void execute(
+ final Configuration conf, final DragonChildProtocol proxy,
+ final ChildExecutionContext context) throws IOException,
+ InterruptedException {
+ super.execute(conf, proxy, context);
+
+ // make a mapper
+ Mapper mapper =
+ (Mapper) ReflectionUtils.newInstance(
+ conf.getClass(DragonJobConfig.JOB_MAP_CLASS, Mapper.class), conf);
+ // fetch the input sub-dirs
+
+ // make the event producer
+ EventProducer input = null;
+
+ // if reduce number = 0 , then output = DirectOutputCollector
+ // else output = OutputCollector
+
+ // make a map context
+ MapContext mapContext = null;
+
+ // initialize the event producer
+ // run the mapper
+ mapper.run(mapContext);
+ // close the event producer
+
+ }
+}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/DirectedAcyclicGraph.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/DirectedAcyclicGraph.java
deleted file mode 100644
index 93efa31..0000000
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/DirectedAcyclicGraph.java
+++ /dev/null
@@ -1,634 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.realtime.dag;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.ConcurrentModificationException;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-/**
- *
- * DirectedAcyclicGraph implements a DAG that can be modified (vertices &
- * edges added and removed), is guaranteed to remain acyclic, and provides fast
- * topological order iteration.
- *
- *
- *
- * This class makes no claims to thread safety, and concurrent usage from
- * multiple threads will produce undefined results.
- *
- */
-public class DirectedAcyclicGraph implements Serializable {
- private static final long serialVersionUID = -8352536912775689101L;
-
- /* vertex -> internal vertex */
- private Map> vertexMap =
- new LinkedHashMap>();
- /* edge -> internal edge */
- private Map> edgeMap =
- new LinkedHashMap>();
-
- private transient Set unmodifiableEdgeSet = null;
- private transient Set unmodifiableVertexSet = null;
-
- private EdgeFactory edgeFactory;
-
- private TopoComparator topoComparator;
-
- private TopoOrderMap topoOrderMap;
-
- private int maxTopoIndex = 0;
- private int minTopoIndex = 0;
-
- // this update count is used to keep internal topological iterators honest
- private long topologyUpdateCount = 0;
-
- protected DirectedAcyclicGraph() {
- }
-
- /**
- * @param edgeClass
- */
- public DirectedAcyclicGraph(Class extends E> edgeClass) {
- this(new EdgeFactory(edgeClass));
- }
-
- public DirectedAcyclicGraph(EdgeFactory ef) {
- if (ef == null) {
- throw new NullPointerException();
- }
- edgeFactory = ef;
- initialize();
- }
-
- /**
- * set the topoOrderMap based on the current factory, and create the
- * comparator;
- */
- private void initialize()
- {
- topoOrderMap = TopoOrderMap.getInstance();
- topoComparator = new TopoComparator(topoOrderMap);
- }
-
- /**
- * Ensures that the specified vertex exists in this graph, or else throws
- * exception.
- *
- * @param v vertex
- *
- * @return true
if this assertion holds.
- *
- * @throws NullPointerException if specified vertex is null
.
- * @throws IllegalArgumentException if specified vertex does not exist in
- * this graph.
- */
- protected boolean assertVertexExist(V v)
- {
- if (containsVertex(v)) {
- return true;
- } else if (v == null) {
- throw new NullPointerException();
- } else {
- throw new IllegalArgumentException("no such vertex in graph");
- }
- }
-
- /**
- * A lazy build of edge container for specified vertex.
- *
- * @param vertex a vertex in this graph.
- */
- private InternalVertex getInternalVertex(V vertex) {
- assertVertexExist(vertex);
-
- InternalVertex ec = vertexMap.get(vertex);
-
- if (ec == null) {
- ec = new InternalVertex();
- vertexMap.put(vertex, ec);
- }
-
- return ec;
- }
-
- public V getEdgeSource(E e) {
- InternalEdge internalEdge = edgeMap.get(e);
- // nullable?
- return internalEdge.source;
- }
-
- public V getEdgeTarget(E e) {
- InternalEdge internalEdge = edgeMap.get(e);
- // nullable?
- return internalEdge.target;
- }
-
- /**
- * iterator will traverse the vertices in topological order, meaning that
- * for a directed graph G = (V,E), if there exists a path from vertex va to
- * vertex vb then va is guaranteed to come before vertex vb in the iteration
- * order.
- *
- * @return an iterator that will traverse the graph in topological order
- */
- public Iterator iterator() {
- return new TopoIterator();
- }
-
- /**
- * Adds the specified vertex to this graph if not already present, and puts it
- * at the top of the internal topological vertex ordering. More formally, adds
- * the specified vertex, v
, to this graph if this graph contains
- * no vertex u
such that
- * u.equals(v)
. If this graph already contains such vertex, the call
- * leaves this graph unchanged and returns false. In combination with
- * the restriction on constructors, this ensures that graphs never contain
- * duplicate vertices.
- *
- * @param v vertex to be added to this graph.
- *
- * @return true if this graph did not already contain the specified
- * vertex.
- *
- * @throws NullPointerException if the specified vertex is
- * null
.
- */
- public boolean addVertex(V v) {
- boolean added = addVertexInternal(v);
- if (added) {
- // add to the top
- ++maxTopoIndex;
- topoOrderMap.putVertex(maxTopoIndex, v);
-
- ++topologyUpdateCount;
- }
-
- return added;
- }
-
- protected boolean addVertexInternal(V v) {
- if (v == null) {
- throw new NullPointerException();
- } else if (containsVertex(v)) {
- return false;
- } else {
- vertexMap.put(v, new InternalVertex());
- return true;
- }
- }
-
- /**
- *
- * Adds the given edge and updates the internal topological order for
- * consistency IFF
- *
- *
- * - there is not already an edge (fromVertex, toVertex) in the graph
- *
- the edge does not induce a cycle in the graph
- *
- *
- *
- * @return null if the edge is already in the graph, else the created edge is
- * returned
- *
- * @throws IllegalArgumentException If either fromVertex or toVertex is not a
- * member of the graph
- * @throws CycleFoundException if the edge would induce a cycle in the graph
- */
- public E addEdge(V fromVertex, V toVertex)
- throws CycleFoundException {
- // we don't need to check the existence of those vertices here
- addVertex(fromVertex);
- addVertex(toVertex);
-
- Integer lb = topoOrderMap.getTopologicalIndex(toVertex);
- Integer ub = topoOrderMap.getTopologicalIndex(fromVertex);
-
- if (lb < ub) {
- Set df = new HashSet();
- Set db = new HashSet();
-
- // Discovery
- Region affectedRegion = new Region(lb, ub);
- Visited visited = Visited.getInstance(affectedRegion);
-
- // throws CycleFoundException if there is a cycle
- dfsF(toVertex, df, visited, affectedRegion);
-
- dfsB(fromVertex, db, visited, affectedRegion);
- reorder(df, db, visited);
- ++topologyUpdateCount; // if we do a reorder, than the topology has
- // been updated
- }
-
- return addEdgeInternal(fromVertex, toVertex);
- }
-
-
- protected E addEdgeInternal(V fromVertex, V toVertex) {
- E e = edgeFactory.createEdge(fromVertex, toVertex);
- if (containsEdge(e)) { // this restriction should stay!
- return null;
- } else {
- InternalEdge internalEdge = new InternalEdge(fromVertex, toVertex);
- edgeMap.put(e, internalEdge);
- vertexMap.get(fromVertex).addOutgoingEdge(e);
- vertexMap.get(toVertex).addIncomingEdge(e);
- return e;
- }
- }
-
- public boolean containsEdge(E e) {
- return edgeMap.containsKey(e);
- }
-
- public boolean containsVertex(V v) {
- return vertexMap.containsKey(v);
- }
-
- public boolean removeEdge(E e) {
- if (containsEdge(e)) {
- InternalEdge ie = edgeMap.get(e);
- vertexMap.get(ie.getSource()).removeOutgoingEdge(e);
- vertexMap.get(ie.getTarget()).removeOutgoingEdge(e);
- edgeMap.remove(e);
- return true;
- } else {
- return false;
- }
- }
-
- /**
- */
- public boolean removeAllEdges(Collection extends E> edges) {
- boolean modified = false;
-
- for (E e : edges) {
- modified |= removeEdge(e);
- }
-
- return modified;
- }
-
- /**
- * Removes the specified vertex from this graph including all its touching
- * edges if present. More formally, if the graph contains a vertex
- * u
such that u.equals(v)
, the call removes all edges
- * that touch u
and then removes u
itself. If no
- * such u
is found, the call leaves the graph unchanged.
- * Returns true if the graph contained the specified vertex. (The
- * graph will not contain the specified vertex once the call returns).
- *
- * If the specified vertex is null
returns
- * false
.
- *
- * @param v vertex to be removed from this graph, if present.
- *
- * @return true
if the graph contained the specified vertex;
- * false
otherwise.
- */
- public boolean removeVertex(V v) {
- boolean removed = removeVertexInternal(v);
- if (removed) {
- Integer topoIndex = topoOrderMap.removeVertex(v);
-
- // contract minTopoIndex as we are able
- if (topoIndex == minTopoIndex) {
- while ((minTopoIndex < 0)
- && (null == topoOrderMap.getVertex(minTopoIndex))) {
- ++minTopoIndex;
- }
- }
-
- // contract maxTopoIndex as we are able
- if (topoIndex == maxTopoIndex) {
- while ((maxTopoIndex > 0)
- && (null == topoOrderMap.getVertex(maxTopoIndex))) {
- --maxTopoIndex;
- }
- }
-
- ++topologyUpdateCount;
- }
-
- return removed;
- }
-
- protected boolean removeVertexInternal(V v) {
- if (containsVertex(v)) {
- InternalVertex iv = vertexMap.get(v);
- removeAllEdges(iv.getUnmodifiableIncomingEdges());
- removeAllEdges(iv.getUnmodifiableOutgoingEdges());
- vertexMap.remove(v);
- return true;
- } else {
- return false;
- }
- }
-
- public boolean removeAllVertices(Collection extends V> vertices) {
- boolean removed = removeAllVerticesInternal(vertices);
-
- topoOrderMap.removeAllVertices();
-
- maxTopoIndex = 0;
- minTopoIndex = 0;
-
- ++topologyUpdateCount;
-
- return removed;
- }
-
- /**
- */
- public boolean removeAllVerticesInternal(Collection extends V> vertices) {
- boolean modified = false;
-
- for (V v : vertices) {
- modified |= removeVertex(v);
- }
-
- return modified;
- }
-
- /**
- * @see DirectedGraph#incomingEdges(Object)
- */
- public Set incomingEdgesOf(V vertex) {
- return getInternalVertex(vertex).getUnmodifiableIncomingEdges();
- }
-
- /**
- * @see DirectedGraph#incomingEdges(Object)
- */
- public Set outgoingEdgesOf(V vertex) {
- return getInternalVertex(vertex).getUnmodifiableOutgoingEdges();
- }
-
-
- public Set vertexSet() {
- if (unmodifiableVertexSet == null) {
- unmodifiableVertexSet = Collections.unmodifiableSet(vertexMap.keySet());
- }
-
- return unmodifiableVertexSet;
- }
-
- public Set edgeSet() {
- if (unmodifiableEdgeSet == null) {
- unmodifiableEdgeSet = Collections.unmodifiableSet(edgeMap.keySet());
- }
-
- return unmodifiableEdgeSet;
- }
-
-
- /**
- * Depth first search forward, building up the set (df) of forward-connected
- * vertices in the Affected Region
- *
- * @param vertex the vertex being visited
- * @param df the set we are populating with forward connected vertices in
- * the Affected Region
- * @param visited a simple data structure that lets us know if we already
- * visited a node with a given topo index
- * @param topoIndexMap for quick lookups, a map from vertex to topo index in
- * the AR
- * @param ub the topo index of the original fromVertex -- used for cycle
- * detection
- *
- * @throws CycleFoundException if a cycle is discovered
- */
- private void
- dfsF(V vertex, Set df, Visited visited, Region affectedRegion)
- throws CycleFoundException {
- int topoIndex = topoOrderMap.getTopologicalIndex(vertex);
-
- // Assumption: vertex is in the AR and so it will be in visited
- visited.setVisited(topoIndex);
-
- df.add(vertex);
-
- for (E outEdge : outgoingEdgesOf(vertex)) {
- V nextVertex = getEdgeTarget(outEdge);
- Integer nextVertexTopoIndex =
- topoOrderMap.getTopologicalIndex(nextVertex);
-
- if (nextVertexTopoIndex.intValue() == affectedRegion.finish) {
- // reset visited
- try {
- for (V visitedVertex : df) {
- visited.clearVisited(topoOrderMap
- .getTopologicalIndex(visitedVertex));
- }
- } catch (UnsupportedOperationException e) {
- // okay, fine, some implementations (ones that automatically
- // clear themselves out) don't work this way
- }
- throw new CycleFoundException();
- }
-
- // note, order of checks is important as we need to make sure the
- // vertex is in the affected region before we check its visited
- // status (otherwise we will be causing an
- // ArrayIndexOutOfBoundsException).
- if (affectedRegion.isIn(nextVertexTopoIndex)
- && !visited.getVisited(nextVertexTopoIndex)) {
- dfsF(nextVertex, df, visited, affectedRegion); // recurse
- }
- }
- }
-
- /**
- * Depth first search backward, building up the set (db) of back-connected
- * vertices in the Affected Region
- *
- * @param vertex the vertex being visited
- * @param db the set we are populating with back-connected vertices in the AR
- * @param visited
- * @param topoIndexMap
- */
- private void
- dfsB(V vertex, Set db, Visited visited, Region affectedRegion) {
- // Assumption: vertex is in the AR and so we will get a topoIndex from
- // the map
- int topoIndex = topoOrderMap.getTopologicalIndex(vertex);
- visited.setVisited(topoIndex);
-
- db.add(vertex);
-
- for (E inEdge : incomingEdgesOf(vertex)) {
- V previousVertex = getEdgeSource(inEdge);
- Integer previousVertexTopoIndex =
- topoOrderMap.getTopologicalIndex(previousVertex);
-
- // note, order of checks is important as we need to make sure the
- // vertex is in the affected region before we check its visited
- // status (otherwise we will be causing an
- // ArrayIndexOutOfBoundsException).
- if (affectedRegion.isIn(previousVertexTopoIndex)
- && !visited.getVisited(previousVertexTopoIndex)) {
- // if prevousVertexTopoIndex != null, the vertex is in the
- // Affected Region according to our topoIndexMap
-
- dfsB(previousVertex, db, visited, affectedRegion);
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- private void reorder(Set df, Set db, Visited visited) {
- List topoDf = new ArrayList(df);
- List topoDb = new ArrayList(db);
-
- Collections.sort(topoDf, topoComparator);
- Collections.sort(topoDb, topoComparator);
-
- // merge these suckers together in topo order
-
- SortedSet availableTopoIndices = new TreeSet();
-
- // we have to cast to the generic type, can't do "new V[size]" in java
- // 5;
- V[] bigL = (V[]) new Object[df.size() + db.size()];
- int lIndex = 0; // this index is used for the sole purpose of pushing
- // into
-
- // the correct index of bigL
-
- // assume (for now) that we are resetting visited
- boolean clearVisited = true;
-
- for (V vertex : topoDb) {
- Integer topoIndex = topoOrderMap.getTopologicalIndex(vertex);
-
- // add the available indices to the set
- availableTopoIndices.add(topoIndex);
-
- bigL[lIndex++] = vertex;
-
- if (clearVisited) { // reset visited status if supported
- try {
- visited.clearVisited(topoIndex);
- } catch (UnsupportedOperationException e) {
- clearVisited = false;
- }
- }
- }
-
- for (V vertex : topoDf) {
- Integer topoIndex = topoOrderMap.getTopologicalIndex(vertex);
-
- // add the available indices to the set
- availableTopoIndices.add(topoIndex);
- bigL[lIndex++] = vertex;
-
- if (clearVisited) { // reset visited status if supported
- try {
- visited.clearVisited(topoIndex);
- } catch (UnsupportedOperationException e) {
- clearVisited = false;
- }
- }
- }
-
- lIndex = 0; // reusing lIndex
- for (Integer topoIndex : availableTopoIndices) {
- // assign the indexes to the elements of bigL in order
- V vertex = bigL[lIndex++]; // note the post-increment
- topoOrderMap.putVertex(topoIndex, vertex);
- }
- }
-
- /**
- * iterator which follows topological order
- */
- private class TopoIterator implements Iterator {
- private int currentTopoIndex;
- private final long updateCountAtCreation;
- private Integer nextIndex = null;
-
- public TopoIterator() {
- updateCountAtCreation = topologyUpdateCount;
- currentTopoIndex = minTopoIndex - 1;
- }
-
- public boolean hasNext() {
- if (updateCountAtCreation != topologyUpdateCount) {
- throw new ConcurrentModificationException();
- }
-
- nextIndex = getNextIndex();
- return nextIndex != null;
- }
-
- public V next() {
- if (updateCountAtCreation != topologyUpdateCount) {
- throw new ConcurrentModificationException();
- }
-
- if (nextIndex == null) {
- // find nextIndex
- nextIndex = getNextIndex();
- }
- if (nextIndex == null) {
- throw new NoSuchElementException();
- }
- currentTopoIndex = nextIndex;
- nextIndex = null;
- return topoOrderMap.getVertex(currentTopoIndex); // topoToVertex.get(currentTopoIndex);
- }
-
- public void remove() {
- if (updateCountAtCreation != topologyUpdateCount) {
- throw new ConcurrentModificationException();
- }
-
- V vertexToRemove = null;
- if (null != (vertexToRemove = topoOrderMap.getVertex(currentTopoIndex))) {
- topoOrderMap.removeVertex(vertexToRemove);
- } else {
- // should only happen if next() hasn't been called
- throw new IllegalStateException();
- }
- }
-
- private Integer getNextIndex() {
- for (int i = currentTopoIndex + 1; i <= maxTopoIndex; i++) {
- if (null != topoOrderMap.getVertex(i)) {
- return i;
- }
- }
- return null;
- }
- }
-
-}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/EdgeFactory.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/EdgeFactory.java
deleted file mode 100644
index 6bed285..0000000
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/EdgeFactory.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.realtime.dag;
-
-import java.io.Serializable;
-
-/**
- * An {@link EdgeFactory} for producing edges by using a class as a factory.
- */
-public class EdgeFactory implements Serializable {
- private static final long serialVersionUID = -7890706652865009651L;
-
- private final Class extends E> edgeClass;
-
- public EdgeFactory(Class extends E> edgeClass) {
- this.edgeClass = edgeClass;
- }
-
- /**
- * @see EdgeFactory#createEdge(Object, Object)
- */
- public E createEdge(V source, V target) {
- try {
- return edgeClass.newInstance();
- } catch (Exception ex) {
- throw new RuntimeException("Edge factory failed", ex);
- }
- }
-}
\ No newline at end of file
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/InternalEdge.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/InternalEdge.java
deleted file mode 100644
index db1c97e..0000000
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/InternalEdge.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.realtime.dag;
-
-import java.io.Serializable;
-
-/**
- * An internal view of edges from a {@link DirectedAcyclicGraph}
- */
-class InternalEdge implements Serializable {
- private static final long serialVersionUID = 1917792937475311485L;
-
- V source;
- V target;
-
- InternalEdge(V source, V target) {
- this.source = source;
- this.target = target;
- }
-
- /**
- * Get the source vertex of this edge.
- * @return the source vertex of this edge.
- */
- public V getSource() {
- return source;
- }
-
- /**
- * Get the target vertex of this edge.
- * @return the target vertex of this edge.
- */
- public V getTarget() {
- return target;
- }
-
-}
\ No newline at end of file
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/InternalVertex.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/InternalVertex.java
deleted file mode 100644
index 18e1b36..0000000
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/InternalVertex.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package org.apache.hadoop.realtime.dag;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * An internal view of vertices.
- */
-class InternalVertex implements Serializable {
- private static final long serialVersionUID = 337763367843853900L;
-
- Set incoming;
- Set outgoing;
- private transient Set unmodifiableIncoming = null;
- private transient Set unmodifiableOutgoing = null;
-
- InternalVertex() {
- incoming = new HashSet();
- outgoing = new HashSet();
- }
-
- /**
- * A lazy build of unmodifiable incoming edge set.
- *
- * @return
- */
- public Set getUnmodifiableIncomingEdges() {
- if (unmodifiableIncoming == null) {
- unmodifiableIncoming = Collections.unmodifiableSet(incoming);
- }
- return unmodifiableIncoming;
- }
-
- /**
- * A lazy build of unmodifiable outgoing edge set.
- *
- * @return
- */
- public Set getUnmodifiableOutgoingEdges() {
- if (unmodifiableOutgoing == null) {
- unmodifiableOutgoing = Collections.unmodifiableSet(outgoing);
- }
-
- return unmodifiableOutgoing;
- }
-
- public void addIncomingEdge(E e) {
- incoming.add(e);
- }
-
- public void addOutgoingEdge(E e) {
- outgoing.add(e);
- }
-
- public void removeIncomingEdge(E e) {
- incoming.remove(e);
- }
-
- public void removeOutgoingEdge(E e) {
- outgoing.remove(e);
- }
-}
\ No newline at end of file
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/TopoComparator.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/TopoComparator.java
deleted file mode 100644
index 96f26e3..0000000
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/TopoComparator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.apache.hadoop.realtime.dag;
-
-import java.io.Serializable;
-import java.util.Comparator;
-
-/**
- * Note, this is a lazy and incomplete implementation, with assumptions that
- * inputs are in the given topoIndexMap
- */
-class TopoComparator implements Comparator, Serializable{
- private static final long serialVersionUID = 7091580997211005045L;
-
- private TopoOrderMap topoOrderMap;
-
- public TopoComparator(TopoOrderMap topoOrderMap) {
- this.topoOrderMap = topoOrderMap;
- }
-
- public int compare(V v1, V v2) {
- return topoOrderMap.getTopologicalIndex(v1).compareTo(
- topoOrderMap.getTopologicalIndex(v2));
- }
-}
\ No newline at end of file
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/TopoOrderMap.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/TopoOrderMap.java
deleted file mode 100644
index b5a3f52..0000000
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/TopoOrderMap.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package org.apache.hadoop.realtime.dag;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * For performance and flexibility uses an ArrayList for topological index to
- * vertex mapping, and a HashMap for vertex to topological index mapping.
- */
-class TopoOrderMap implements Serializable {
- private static final long serialVersionUID = 7834322521006147372L;
-
- private final List topoToVertex = new ArrayList();
- private final Map vertexToTopo =
- new HashMap();
-
- public static TopoOrderMap getInstance() {
- return new TopoOrderMap();
- }
-
- public void putVertex(Integer index, V vertex) {
- int translatedIndex = translateIndex(index);
-
- // grow topoToVertex as needed to accommodate elements
- while ((translatedIndex + 1) > topoToVertex.size()) {
- topoToVertex.add(null);
- }
-
- topoToVertex.set(translatedIndex, vertex);
- vertexToTopo.put(vertex, index);
- }
-
- public V getVertex(Integer index) {
- return topoToVertex.get(translateIndex(index));
- }
-
- public Integer getTopologicalIndex(V vertex) {
- return vertexToTopo.get(vertex);
- }
-
- public Integer removeVertex(V vertex) {
- Integer topoIndex = vertexToTopo.remove(vertex);
- if (topoIndex != null) {
- topoToVertex.set(translateIndex(topoIndex), null);
- }
- return topoIndex;
- }
-
- public void removeAllVertices() {
- vertexToTopo.clear();
- topoToVertex.clear();
- }
-
- /**
- * We translate the topological index to an ArrayList index. We have to do
- * this because topological indices can be negative, and we want to do it
- * because we can make better use of space by only needing an ArrayList of
- * size |AR|.
- *
- * @param unscaledIndex
- *
- * @return the ArrayList index
- */
- private final int translateIndex(int index) {
- if (index >= 0) {
- return 2 * index;
- }
- return -1 * ((index * 2) - 1);
- }
-}
\ No newline at end of file
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/Visited.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/Visited.java
deleted file mode 100644
index 10fc9ef..0000000
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/Visited.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.realtime.dag;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- */
-class Visited {
- private final List visited = new ArrayList();
-
- private Region affectedRegion;
-
- protected Visited(Region affectedRegion) {
- // Make sure visited is big enough
- int minSize = (affectedRegion.finish - affectedRegion.start) + 1;
- /* plus one because the region range is inclusive of both indices */
-
- while (visited.size() < minSize) {
- visited.add(Boolean.FALSE);
- }
-
- this.affectedRegion = affectedRegion;
- }
-
- public static Visited getInstance(Region affectedRegion) {
- return new Visited(affectedRegion);
- }
-
- public void setVisited(int index) {
- visited.set(translateIndex(index), Boolean.TRUE);
- }
-
- public boolean getVisited(int index) {
- Boolean result = null;
-
- result = visited.get(translateIndex(index));
-
- return result;
- }
-
- public void clearVisited(int index) throws UnsupportedOperationException {
- visited.set(translateIndex(index), Boolean.FALSE);
- }
-
- /**
- * We translate the topological index to an ArrayList index. We have to do
- * this because topological indices can be negative, and we want to do it
- * because we can make better use of space by only needing an ArrayList of
- * size |AR|.
- *
- * @param unscaledIndex
- *
- * @return the ArrayList index
- */
- private int translateIndex(int index) {
- return index - affectedRegion.start;
- }
-}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/DefaultEventProducer.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/DefaultEventProducer.java
new file mode 100644
index 0000000..abf2263
--- /dev/null
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/DefaultEventProducer.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.realtime.event;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.realtime.fs.StandardWatchEventKinds;
+import org.apache.hadoop.realtime.fs.WatchEvent;
+import org.apache.hadoop.realtime.fs.WatchKey;
+import org.apache.hadoop.realtime.fs.WatchService;
+import org.apache.hadoop.realtime.fs.WatchServiceFactory;
+import org.apache.hadoop.realtime.records.ChildExecutionContext;
+
+/**
+ */
+public class DefaultEventProducer implements
+ EventProducer, Closeable {
+ private static final Log LOG = LogFactory.getLog(DefaultEventProducer.class);
+
+ private static final NumberFormat fileNameFormat;
+ static {
+ fileNameFormat = NumberFormat.getInstance();
+ fileNameFormat.setMinimumIntegerDigits(20);
+ fileNameFormat.setMaximumFractionDigits(0);
+ fileNameFormat.setGroupingUsed(false);
+ }
+ private static final String FILE_SUFFIX = ".drg";
+ /* 20 digitals + FILE_SUFFIX */
+ private static final Pattern FILE_PATTERN = Pattern
+ .compile("\\d{20}\\" + FILE_SUFFIX);
+
+ private volatile boolean running = true;
+
+ private final Configuration conf;
+ private final FileSystem fileSystem;
+ private final Path parentDir;
+
+ private FSDataInputStream in;
+
+ private WatchService watchService;
+
+ private final BlockingDeque fifoQueue =
+ new LinkedBlockingDeque();
+
+ private final ExecutorService singleExecutor = Executors
+ .newSingleThreadExecutor();
+
+ public DefaultEventProducer(final Configuration conf, final Path path,
+ final int bufferSize) throws IOException {
+ this.conf = conf;
+ this.fileSystem = FileSystem.get(conf);
+ this.parentDir = path;
+ startWatchService();
+ }
+
+
+ public DefaultEventProducer(final Configuration conf, final Path path,
+ final int bufferSize, final long startOffset) throws IOException {
+ this.conf = conf;
+ this.fileSystem = FileSystem.get(conf);
+ this.parentDir = path;
+ initByOffset(fileSystem, parentDir, startOffset);
+ startWatchService();
+ }
+
+ @Override
+ public void initialize(ChildExecutionContext context) throws IOException,
+ InterruptedException {
+ // TODO Auto-generated method stub
+
+ }
+
+ /**
+ * take a event from the queue.
+ *
+ * @return the current key object or null if there isn't one
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public Event pollEvent() throws IOException,
+ InterruptedException {
+ FSDataInputStream input = getInputStream();
+ if (input == null) {
+ return null;
+ }
+
+ return null;
+ }
+
+ /**
+ * take a event from the queue.
+ *
+ * @return the current key object or null if there isn't one
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public Event pollEvent(long timeout, TimeUnit unit)
+ throws IOException, InterruptedException {
+ return null;
+ }
+
+ private void startWatchService() throws IOException {
+ this.watchService = WatchServiceFactory.newWatchService(this.conf);
+ this.watchService.register(this.parentDir,
+ StandardWatchEventKinds.ENTRY_CREATE);
+ this.singleExecutor.execute(new Runnable() {
+ public void run() {
+ // forever polling
+ while (DefaultEventProducer.this.running) {
+ WatchKey key;
+ try {
+ key = DefaultEventProducer.this.watchService.take();
+ } catch (InterruptedException x) {
+ continue;
+ }
+ // break if the directory was removed
+ if (!DefaultEventProducer.this.processEvents(key)) {
+ break;
+ }
+ }
+
+ try {
+ DefaultEventProducer.this.watchService.close();
+ } catch (IOException ioe) {
+ // log here
+ }
+ }
+ });
+ }
+
+ static String nameFromOffset(final long offset) {
+ return fileNameFormat.format(offset) + FILE_SUFFIX;
+ }
+
+ static long startOffsetFromName(final String name) {
+ return Long.parseLong(name.substring(0,
+ name.length() - FILE_SUFFIX.length()));
+ }
+
+ private void initByOffset(final FileSystem fs, final Path parent, final long offset)
+ throws IOException {
+ FileStatus[] statuses = fs.listStatus(parent, new PathFilter() {
+ public boolean accept(Path path) {
+ return FILE_PATTERN.matcher(path.getName()).matches()
+ && (nameFromOffset(offset).compareTo(path.getName()) >= 0);
+ }
+ });
+
+ if (statuses.length == 0) {
+ LOG.info("There is no existing file with its start offset larger than "
+ + offset);
+ return;
+ }
+
+ Arrays.sort(statuses, new Comparator() {
+ @Override
+ public int compare(final FileStatus lhs, final FileStatus rhs) {
+ return lhs.getPath().compareTo(rhs.getPath());
+ }
+ });
+
+ // FIXME: must consider about the length of file header
+ FileStatus lastFile = statuses[statuses.length - 1];
+ long lastFileOffset = startOffsetFromName(lastFile.getPath().getName());
+ if (offset > (lastFileOffset + lastFile.getLen())) {
+ LOG.info("The file corresponding to the offset " + offset
+ + " has been deleted.");
+ return;
+ }
+
+ for (FileStatus status : statuses) {
+ // add file into the queue only if it's newer than the tail of the queue
+ this.fifoQueue.offer(status.getPath());
+ }
+ }
+
+ private synchronized FSDataInputStream getInputStream() throws IOException {
+ if (this.in == null) {
+ Path path = this.fifoQueue.peek();
+ if (path == null) {
+ return null;
+ }
+ this.in = this.fileSystem.open(path);
+ LOG.info("Opening file" + path.toUri().getPath());
+ }
+ return this.in;
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ private synchronized boolean processEvents(WatchKey key) {
+ List newList = new ArrayList();
+
+ for (WatchEvent> event : key.pollEvents()) {
+ WatchEvent ev = (WatchEvent) event;
+ Path eventPath = ev.context();
+ String realPath = eventPath.getName();
+ if (ev.kind() == StandardWatchEventKinds.ENTRY_CREATE
+ || ev.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
+ if (DefaultEventProducer.FILE_PATTERN.matcher(realPath).matches()) {
+ newList.add(eventPath);
+ }
+ }
+ }
+ Collections.sort(newList);
+ Path tail = this.fifoQueue.peekLast();
+ // add to the queue
+ for (Path path : newList) {
+ // add file into the queue only if it's newer than the tail of the queue
+ if (path.getName().endsWith(FILE_SUFFIX)
+ && (tail == null || path.compareTo(tail) > 0)) {
+ this.fifoQueue.offer(path);
+ } else {
+ LOG.info("ignore path " + path.toUri().getPath());
+ }
+ }
+ return key.reset();
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.running = false;
+ this.singleExecutor.shutdown();
+ this.watchService.close();
+ }
+
+}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/Region.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/DirectEventEmitter.java
similarity index 59%
rename from hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/Region.java
rename to hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/DirectEventEmitter.java
index 320b061..f21201e 100644
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/Region.java
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/DirectEventEmitter.java
@@ -15,30 +15,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.realtime.dag;
+package org.apache.hadoop.realtime.event;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
/**
- * Region is an *inclusive* range of indices. Esthetically displeasing, but
- * convenient for our purposes.
*/
-public class Region {
-
- public final int start;
- public final int finish;
+public class DirectEventEmitter implements EventEmitter {
- public Region(int start, int finish) {
- if (start > finish) {
- throw new IllegalArgumentException("(start > finish): invariant broken");
- }
- this.start = start;
- this.finish = finish;
+ @Override
+ public boolean emitEvent(Event event) throws IOException,
+ InterruptedException {
+ return false;
}
- public int getSize() {
- return (finish - start) + 1;
+ @Override
+ public boolean
+ emitEvent(Event event, long timeout, TimeUnit unit)
+ throws IOException, InterruptedException {
+ return false;
}
- public boolean isIn(int index) {
- return (index >= start) && (index <= finish);
+ @Override
+ public void close() throws IOException {
+
}
-}
\ No newline at end of file
+
+}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventDispatcher.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventDispatcher.java
deleted file mode 100644
index a45b760..0000000
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventDispatcher.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package org.apache.hadoop.realtime.event;
-
-
-public interface EventDispatcher {
- /**
- * Dispatch event using stream name. Partitioners may be used to partition
- * the event, possibly based on a pre-determined set of fixed named keys.
- *
- * @param streamName
- * name of stream to dispatch on
- * @param key
- * object to dispatch
- */
- void dispatch(String streamName, KEY key, VALUE value);
-
-}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventEmitter.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventEmitter.java
new file mode 100644
index 0000000..80a60a0
--- /dev/null
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventEmitter.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.realtime.event;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ */
+public interface EventEmitter {
+ /**
+ * Generate an output event
+ */
+ public boolean emitEvent(Event event) throws IOException,
+ InterruptedException;
+
+ /**
+ * Generate an output event
+ */
+ public boolean emitEvent(Event event, long timeout,
+ TimeUnit unit) throws IOException, InterruptedException;
+
+ /**
+ * Close the event emitter.
+ */
+ public void close() throws IOException;
+}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventProcessor.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventProcessor.java
deleted file mode 100644
index 65fb505..0000000
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventProcessor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.hadoop.realtime.event;
-
-import java.io.IOException;
-
-import org.apache.hadoop.realtime.job.TaskAttempt;
-
-public abstract class EventProcessor {
-
- public abstract class Context
- implements TaskAttempt {
- }
-
- /**
- * Called once at the beginning of the task.
- */
- protected void setup(Context context
- ) throws IOException, InterruptedException {
- // NOTHING
- }
-
- protected abstract void execute(KEYIN key, VALUEIN value, Context context) throws IOException;
-
- /**
- * Called once at the end of the task.
- */
- protected void cleanup(Context context
- ) throws IOException, InterruptedException {
- // NOTHING
- }
-}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventProducer.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventProducer.java
index 7d6f6c2..7450208 100644
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventProducer.java
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventProducer.java
@@ -2,8 +2,9 @@
import java.io.Closeable;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.realtime.job.TaskAttempt;
+import org.apache.hadoop.realtime.records.ChildExecutionContext;
/**
*
@@ -17,7 +18,8 @@ public interface EventProducer extends Closeable {
* @throws IOException
* @throws InterruptedException
*/
- public void initialize(TaskAttempt context) throws IOException, InterruptedException;
+ public void initialize(ChildExecutionContext context) throws IOException,
+ InterruptedException;
/**
* Read the next event.
@@ -26,20 +28,20 @@ public interface EventProducer extends Closeable {
* @throws IOException
* @throws InterruptedException
*/
- public boolean nextEvent() throws IOException, InterruptedException;
+ public Event pollEvent() throws IOException, InterruptedException;
/**
- * Get the current event.
+ * Read the next event.
*
- * @return the object that was read
+ * @return true if a key/value pair was read
* @throws IOException
* @throws InterruptedException
*/
- public Event getCurrentEvent() throws IOException, InterruptedException;
-
+ public Event pollEvent(long timeout, TimeUnit unit)
+ throws IOException, InterruptedException;
/**
- * Close the record reader.
+ * Close the event producer.
*/
public void close() throws IOException;
}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventSink.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventUtils.java
similarity index 89%
rename from hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventSink.java
rename to hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventUtils.java
index e923458..3707204 100644
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventSink.java
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventUtils.java
@@ -18,8 +18,7 @@
package org.apache.hadoop.realtime.event;
/**
- * Receives and persists the terminal {@link Event}
*/
-public interface EventSink {
- public void sunk();
+public class EventUtils {
+
}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/SequenceFileEventProduer.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/SequenceFileEventProduer.java
deleted file mode 100644
index 7d0cfb7..0000000
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/SequenceFileEventProduer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.realtime.event;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.realtime.job.TaskAttempt;
-
-/**
- * An {@link EventProducer} that continuously produce {@link Event}
- * from {@link SequenceFile}s by reading key, value pairs.
- */
-public class SequenceFileEventProduer implements
- EventProducer {
-
- private SequenceFile.Reader in;
- protected Configuration conf;
-
- public SequenceFileEventProduer(Configuration conf, Path path)
- throws IOException {
- FileSystem fs = path.getFileSystem(conf);
-
- }
-
- public SequenceFileEventProduer(Configuration conf, Path path, long offset)
- throws IOException {
-
- }
-
- @Override
- public void initialize(TaskAttempt context) throws IOException,
- InterruptedException {
-
- }
-
- @Override
- public boolean nextEvent() throws IOException, InterruptedException {
- return false;
- }
-
- @Override
- public Event getCurrentEvent() throws IOException,
- InterruptedException {
- return null;
- }
-
- @Override
- public void close() throws IOException {
- }
-
-}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/AbstractWatchService.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/AbstractWatchService.java
index cc782a2..8539117 100644
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/AbstractWatchService.java
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/AbstractWatchService.java
@@ -52,12 +52,6 @@ public void cancel() {
protected AbstractWatchService() {
}
- /**
- * Register the given object with this watch service
- */
- abstract WatchKey register(Path path, WatchEvent.Kind>... events)
- throws IOException;
-
// used by AbstractWatchKey to enqueue key
final void enqueueKey(WatchKey key) {
pendingKeys.offer(key);
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/DefaultWatchService.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/DefaultWatchService.java
index 9004593..d886320 100644
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/DefaultWatchService.java
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/DefaultWatchService.java
@@ -71,7 +71,7 @@ public Thread newThread(Runnable r) {
* Register the given file with this watch service
*/
@Override
- WatchKey register(final Path path, WatchEvent.Kind>... events)
+ public WatchKey register(final Path path, WatchEvent.Kind>... events)
throws IOException {
// check events - CCE will be thrown if there are invalid elements
if (events.length == 0)
@@ -265,7 +265,7 @@ public void run() {
}
};
this.poller =
- scheduledExecutor.scheduleAtFixedRate(thunk, period, period,
+ scheduledExecutor.scheduleAtFixedRate(thunk, 0, period,
TimeUnit.MILLISECONDS);
}
}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/WatchService.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/WatchService.java
index 7777d2f..e0ef2c9 100644
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/WatchService.java
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/WatchService.java
@@ -22,6 +22,8 @@
import java.io.IOException;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.fs.Path;
+
/**
* A watch service that watches registered objects for changes and
* events. For example a file manager may use a watch service to monitor a
@@ -119,6 +121,14 @@ public interface WatchService
*/
@Override
void close() throws IOException;
+
+
+ /**
+ * Register the given object with this watch service
+ */
+ WatchKey register(Path path, WatchEvent.Kind>... events)
+ throws IOException;
+
/**
* Retrieves and removes the next watch key, or {@code null} if none are
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/job/Mapper.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/job/Mapper.java
deleted file mode 100644
index fd9c9bf..0000000
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/job/Mapper.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.hadoop.realtime.job;
-
-public class Mapper {
-
-}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/job/Reducer.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/job/Reducer.java
deleted file mode 100644
index 3cedc56..0000000
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/job/Reducer.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.hadoop.realtime.job;
-
-public class Reducer {
-
-}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/MapContext.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/MapContext.java
new file mode 100644
index 0000000..8784a63
--- /dev/null
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/MapContext.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.realtime.mr;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.realtime.child.Context;
+
+/**
+ * The context that is given to the {@link Mapper}.
+ * @param the key input type to the Mapper
+ * @param the value input type to the Mapper
+ * @param the key output type from the Mapper
+ * @param the value output type from the Mapper
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface MapContext
+ extends Context {
+
+}
+
\ No newline at end of file
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/MapContextImpl.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/MapContextImpl.java
new file mode 100644
index 0000000..da01298
--- /dev/null
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/MapContextImpl.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.realtime.mr;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.realtime.conf.DragonConfiguration;
+import org.apache.hadoop.realtime.event.Event;
+import org.apache.hadoop.realtime.records.TaskAttemptId;
+import org.apache.hadoop.realtime.records.TaskType;
+
+/**
+ * The context that is given to the {@link Mapper}.
+ *
+ * @param the key input type to the Mapper
+ * @param the value input type to the Mapper
+ * @param the key output type from the Mapper
+ * @param the value output type from the Mapper
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class MapContextImpl implements
+ MapContext {
+
+ @Override
+ public DragonConfiguration getConfiguration() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public TaskAttemptId getTaskAttemptId() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public int getPartition() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public String getUser() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public TaskType getTaskType() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Event pollEvent() throws IOException,
+ InterruptedException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Event pollEvent(long timeout, TimeUnit unit)
+ throws IOException, InterruptedException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean emitEvent(Event event) throws IOException,
+ InterruptedException {
+ return false;
+ }
+
+ @Override
+ public boolean emitEvent(Event event, long timeout,
+ TimeUnit unit) throws IOException, InterruptedException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/Mapper.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/Mapper.java
new file mode 100644
index 0000000..f156641
--- /dev/null
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/Mapper.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.realtime.mr;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.realtime.event.Event;
+
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class Mapper {
+
+ /**
+ * Called once at the beginning of the task.
+ */
+ protected void setup(MapContext context)
+ throws IOException, InterruptedException {
+ // NOTHING
+ }
+
+ /**
+ * Called once for each event in the input. Most applications should override
+ * this, but the default is the identity function.
+ */
+ @SuppressWarnings("unchecked")
+ protected void map(Event event,
+ MapContext context) throws IOException,
+ InterruptedException {
+ context.emitEvent((Event) event);
+ }
+
+ /**
+ * Called once at the end of the task.
+ */
+ protected void cleanup(MapContext context)
+ throws IOException, InterruptedException {
+ // NOTHING
+ }
+
+ /**
+ * Expert users can override this method for more complete control over the
+ * execution of the Mapper.
+ *
+ * @param context
+ * @throws IOException
+ */
+ public void run(MapContext context)
+ throws IOException, InterruptedException {
+ setup(context);
+ try {
+ while (true) {
+ Event evt = context.pollEvent();
+ if (evt != null)
+ map(evt, context);
+ }
+ } catch (InterruptedException ie) {
+ cleanup(context);
+ throw ie;
+ }
+ }
+}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/Partitioner.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/Partitioner.java
new file mode 100644
index 0000000..b748977
--- /dev/null
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/Partitioner.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.realtime.mr;
+
+import org.apache.hadoop.realtime.event.Event;
+
+/**
+ * Partitions the key space.
+ *
+ * Partitioner
controls the partitioning of the keys of the
+ * intermediate map-outputs. The key (or a subset of the key) is used to derive
+ * the partition, typically by a hash function. The total number of partitions
+ * is the same as the number of reduce tasks for the job. Hence this controls
+ * which of the m
reduce tasks the intermediate key (and hence the
+ * record) is sent for reduction.
+ *
+ * @see Reducer
+ */
+public interface Partitioner{
+
+ /**
+ * Get the paritition number for a given key (hence record) given the total
+ * number of partitions i.e. number of reduce-tasks for the job.
+ *
+ * Typically a hash function on a all or a subset of the key.
+ *
+ * @param key the key to be paritioned.
+ * @param value the entry value.
+ * @param numPartitions the total number of partitions.
+ * @return the partition number for the key
.
+ */
+ int getPartition(Event event, int numPartitions);
+}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/CycleFoundException.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/ReduceContext.java
similarity index 68%
rename from hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/CycleFoundException.java
rename to hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/ReduceContext.java
index eefaecf..7ff982e 100644
--- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/CycleFoundException.java
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/ReduceContext.java
@@ -15,13 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.hadoop.realtime.mr;
-package org.apache.hadoop.realtime.dag;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.realtime.child.Context;
/**
- * Exception used on a {@link DirectedAcyclicGraph} when a cycle is found
- *
*/
-@SuppressWarnings("serial")
-public class CycleFoundException extends Exception {
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ReduceContext extends
+ Context {
+
}
diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/Reducer.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/Reducer.java
new file mode 100644
index 0000000..4eb05bc
--- /dev/null
+++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/Reducer.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.realtime.mr;
+
+import java.io.IOException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.realtime.child.Context;
+import org.apache.hadoop.realtime.event.Event;
+
+/**
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class Reducer {
+
+ /**
+ * Called once at the start of the task.
+ */
+ protected void setup(ReduceContext context)
+ throws IOException, InterruptedException {
+ // NOTHING
+ }
+
+ /**
+ * This method is called once for each key. Most applications will define
+ * their reduce class by overriding this method. The default implementation is
+ * an identity function.
+ */
+ @SuppressWarnings("unchecked")
+ protected abstract void reduce(Event event,
+ ReduceContext context)
+ throws IOException, InterruptedException;
+
+ /**
+ * Called once at the end of the task.
+ */
+ protected void
+ cleanup(ReduceContext context)
+ throws IOException, InterruptedException {
+ // NOTHING
+ }
+
+ /**
+ * Advanced application writers can use the
+ * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to control
+ * how the reduce task works.
+ */
+ public void run(ReduceContext context)
+ throws IOException, InterruptedException {
+ setup(context);
+ try {
+ while (true) {
+ reduce(context.pollEvent(), context);
+ }
+ } catch (InterruptedException ie) {
+ cleanup(context);
+ throw ie;
+ }
+ }
+}