diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/DragonJob.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/DragonJob.java index be8ae6a..4d4eacb 100644 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/DragonJob.java +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/DragonJob.java @@ -29,6 +29,8 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.realtime.conf.DragonConfiguration; +import org.apache.hadoop.realtime.mr.Mapper; +import org.apache.hadoop.realtime.mr.Reducer; import org.apache.hadoop.realtime.records.CounterGroup; import org.apache.hadoop.realtime.records.Counters; import org.apache.hadoop.realtime.records.JobId; @@ -322,11 +324,25 @@ public JobState getState() { return null; } - public void setMapper(Class clazz){ - conf.setClass(DragonJobConfig.JOB_MAP_CLASS, clazz, Object.class); + /** + * Set the {@link Mapper} for the job. + * @param cls the Mapper to use + * @throws IllegalStateException if the job is submitted + */ + public void setMapperClass(Class clazz) + throws IllegalStateException { + ensureState(JobState.NEW); + conf.setClass(DragonJobConfig.JOB_MAP_CLASS, clazz, Mapper.class); } - - public void setReducer(Class clazz){ - conf.setClass(DragonJobConfig.JOB_REDUCE_CLASS, clazz, Object.class); + + /** + * Set the {@link Reducer} for the job. + * @param cls the Reducer to use + * @throws IllegalStateException if the job is submitted + */ + public void setReducerClass(Class clazz) + throws IllegalStateException { + ensureState(JobState.NEW); + conf.setClass(DragonJobConfig.JOB_REDUCE_CLASS, clazz, Reducer.class); } } diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/DragonJobConfig.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/DragonJobConfig.java index 0860ab1..94e416f 100644 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/DragonJobConfig.java +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/DragonJobConfig.java @@ -292,11 +292,13 @@ public class DragonJobConfig { public static final int DEFAULT_JOB_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD = 50; - public static final String JOB_MAP_CLASS = "map.class"; + public static final String JOB_MAP_CLASS = "dragon.map.class"; - public static final String JOB_REDUCE_CLASS = "reduce.class"; + public static final String JOB_REDUCE_CLASS = "dragon.reduce.class"; - public static final String MAP_PARALLELISM = "map.parallelism"; + public static final String JOB_OUTPUT_DIR = "dragon.output.dir"; + + public static final String MAP_PARALLELISM = "dragon.map.tasks"; - public static final String REDUCE_PARALLELISM = "reduce.parallelism"; + public static final String REDUCE_PARALLELISM = "dragon.reduce.tasks"; } diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/ChildExecutor.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/ChildExecutor.java index e2f599b..be91059 100644 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/ChildExecutor.java +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/ChildExecutor.java @@ -17,17 +17,219 @@ */ package org.apache.hadoop.realtime.child; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystem.Statistics; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.realtime.DragonJobConfig; +import org.apache.hadoop.realtime.conf.DragonConfiguration; +import org.apache.hadoop.realtime.event.Event; +import org.apache.hadoop.realtime.event.EventEmitter; +import org.apache.hadoop.realtime.event.EventProducer; +import org.apache.hadoop.realtime.mr.MapContext; +import org.apache.hadoop.realtime.mr.Mapper; import org.apache.hadoop.realtime.protocol.DragonChildProtocol; import org.apache.hadoop.realtime.records.ChildExecutionContext; +import org.apache.hadoop.realtime.records.Counters; +import org.apache.hadoop.realtime.records.TaskType; +import org.apache.hadoop.util.ReflectionUtils; /** */ -public class ChildExecutor { +abstract class ChildExecutor { + // @InterfaceAudience.Private + // @InterfaceStability.Unstable + // protected class TaskReporter implements StatusReporter, Runnable { + // private DragonChildProtocol umbilical;; + // private DragonConfiguration conf; + // private Progress taskProgress; + // private Thread pingThread = null; + // private boolean done = true; + // private Object lock = new Object(); + // + // TaskReporter(Progress taskProgress, DragonChildProtocol umbilical) { + // this.umbilical = umbilical; + // this.taskProgress = taskProgress; + // } + // + // + // public void setStatus(String status) { + // taskProgress.setStatus(normalizeStatus(status, conf)); + // } + // + // public Counters.Counter getCounter(String group, String name) { + // Counters.Counter counter = null; + // if (counters != null) { + // counter = counters.findCounter(group, name); + // } + // return counter; + // } + // + // public Counters.Counter getCounter(Enum name) { + // return counters == null ? null : counters.findCounter(name); + // } + // + // public void incrCounter(Enum key, long amount) { + // if (counters != null) { + // counters.incrCounter(key, amount); + // } + // } + // + // public void incrCounter(String group, String counter, long amount) { + // if (counters != null) { + // counters.incrCounter(group, counter, amount); + // } + // if (skipping + // && SkipBadRecords.COUNTER_GROUP.equals(group) + // && (SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS.equals(counter) || + // SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS + // .equals(counter))) { + // // if application reports the processed records, move the + // // currentRecStartIndex to the next. + // // currentRecStartIndex is the start index which has not yet been + // // finished and is still in task's stomach. + // for (int i = 0; i < amount; i++) { + // currentRecStartIndex = currentRecIndexIterator.next(); + // } + // } + // setProgressFlag(); + // } + // + // /** + // * The communication thread handles communication with the parent (Task + // * Tracker). It sends progress updates if progress has been made or if the + // * task needs to let the parent know that it's alive. It also pings the + // * parent to see if it's alive. + // */ + // public void run() { + // final int MAX_RETRIES = 3; + // int remainingRetries = MAX_RETRIES; + // // get current flag value and reset it as well + // boolean sendProgress = resetProgressFlag(); + // while (!taskDone.get()) { + // synchronized (lock) { + // done = false; + // } + // try { + // boolean taskFound = true; // whether TT knows about this task + // // sleep for a bit + // synchronized (lock) { + // if (taskDone.get()) { + // break; + // } + // lock.wait(PROGRESS_INTERVAL); + // } + // if (taskDone.get()) { + // break; + // } + // + // if (sendProgress) { + // // we need to send progress update + // updateCounters(); + // taskStatus.statusUpdate(taskProgress.get(), + // taskProgress.toString(), counters); + // taskFound = umbilical.statusUpdate(taskId, taskStatus); + // taskStatus.clearStatus(); + // } else { + // // send ping + // taskFound = umbilical.ping(taskId); + // } + // + // // if Task Tracker is not aware of our task ID (probably because it + // // died and + // // came back up), kill ourselves + // if (!taskFound) { + // LOG.warn("Parent died. Exiting " + taskId); + // resetDoneFlag(); + // System.exit(66); + // } + // + // sendProgress = resetProgressFlag(); + // remainingRetries = MAX_RETRIES; + // } catch (Throwable t) { + // LOG.info("Communication exception: " + // + StringUtils.stringifyException(t)); + // remainingRetries -= 1; + // if (remainingRetries == 0) { + // ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0); + // LOG.warn("Last retry, killing " + taskId); + // resetDoneFlag(); + // System.exit(65); + // } + // } + // } + // // Notify that we are done with the work + // resetDoneFlag(); + // } + // + // void resetDoneFlag() { + // synchronized (lock) { + // done = true; + // lock.notify(); + // } + // } + // + // public void startCommunicationThread() { + // if (pingThread == null) { + // pingThread = new Thread(this, "communication thread"); + // pingThread.setDaemon(true); + // pingThread.start(); + // } + // } + // + // public void stopCommunicationThread() throws InterruptedException { + // if (pingThread != null) { + // // Intent of the lock is to not send an interupt in the middle of an + // // umbilical.ping or umbilical.statusUpdate + // synchronized (lock) { + // // Interrupt if sleeping. Otherwise wait for the RPC call to return. + // lock.notify(); + // } + // + // synchronized (lock) { + // while (!done) { + // lock.wait(); + // } + // } + // pingThread.interrupt(); + // pingThread.join(); + // } + // } + // } - public static void run(Configuration conf, DragonChildProtocol proxy, - ChildExecutionContext context) { + /** + * Gets a handle to the Statistics instance based on the scheme associated + * with path. + * + * @param path the path. + * @param conf the configuration to extract the scheme from if not part of the + * path. + * @return a Statistics instance, or null if none is found for the scheme. + */ + protected static List getFsStatistics(Path path, + Configuration conf) throws IOException { + List matchedStats = new ArrayList(); + path = path.getFileSystem(conf).makeQualified(path); + String scheme = path.toUri().getScheme(); + for (Statistics stats : FileSystem.getAllStatistics()) { + if (stats.getScheme().equals(scheme)) { + matchedStats.add(stats); + } + } + return matchedStats; + } - + protected void execute( + final Configuration conf, final DragonChildProtocol proxy, + final ChildExecutionContext context) throws IOException, + InterruptedException { + // make a config helper so we can get the classes } + + } diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventSource.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/ChildExecutorFactory.java similarity index 67% rename from hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventSource.java rename to hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/ChildExecutorFactory.java index ed973f6..b3d4a80 100644 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventSource.java +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/ChildExecutorFactory.java @@ -15,11 +15,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.realtime.event; +package org.apache.hadoop.realtime.child; + +import org.apache.hadoop.realtime.records.TaskType; /** - * Receives and persists the terminal {@link Event} */ -public interface EventSource { - public void nextEvent(); +public final class ChildExecutorFactory { + public static ChildExecutor newExecutor(TaskType type) { + if (type == TaskType.MAP) + return new MapChildExecutor(); + else if (type == TaskType.MAP) + return new ReduceChildExecutor(); + else + return null; // or throw an exception? + } } diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/Context.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/Context.java new file mode 100644 index 0000000..e7716b6 --- /dev/null +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/Context.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.realtime.child; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.realtime.conf.DragonConfiguration; +import org.apache.hadoop.realtime.event.Event; +import org.apache.hadoop.realtime.mr.Mapper; +import org.apache.hadoop.realtime.mr.Reducer; +import org.apache.hadoop.realtime.records.TaskAttemptId; +import org.apache.hadoop.realtime.records.TaskType; + +/** + * A context object that allows input and output from the task. It is only + * supplied to the {@link Mapper} or {@link Reducer}. + * @param the input key type for the task + * @param the input value type for the task + * @param the output key type for the task + * @param the output value type for the task + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface Context { + + DragonConfiguration getConfiguration(); + + TaskAttemptId getTaskAttemptId(); + + /** + * return the partition number of this task + * @return + */ + int getPartition(); + + String getUser(); + + + TaskType getTaskType(); + + /** + * take a event from the queue. + * + * @return the current key object or null if there isn't one + * @throws IOException + * @throws InterruptedException + */ + public Event pollEvent() throws IOException, + InterruptedException; + + /** + * take a event from event producer + * + * @return the current key object or null if there isn't one + * @throws IOException + * @throws InterruptedException + */ + public Event pollEvent(long timeout, TimeUnit unit) + throws IOException, InterruptedException; + + /** + * Generate an output event + */ + public boolean emitEvent(Event event) throws IOException, + InterruptedException; + + /** + * Generate an output event + */ + public boolean emitEvent(Event event, long timeout, + TimeUnit unit) throws IOException, InterruptedException; + +} diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/DragonChild.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/DragonChild.java index 5b70657..c2f837e 100644 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/DragonChild.java +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/DragonChild.java @@ -39,12 +39,12 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.realtime.DragonJobConfig; +import org.apache.hadoop.realtime.conf.DragonConfiguration; import org.apache.hadoop.realtime.protocol.DragonChildProtocol; import org.apache.hadoop.realtime.protocol.records.GetTaskRequest; import org.apache.hadoop.realtime.protocol.records.GetTaskResponse; import org.apache.hadoop.realtime.records.ChildExecutionContext; import org.apache.hadoop.realtime.records.TaskAttemptId; -import org.apache.hadoop.realtime.records.TaskId; import org.apache.hadoop.realtime.records.TaskReport; import org.apache.hadoop.realtime.security.TokenCache; import org.apache.hadoop.realtime.security.token.JobTokenIdentifier; @@ -75,7 +75,7 @@ class DragonChild { public static void main(String[] args) throws Throwable { LOG.debug("Child starting"); - final Configuration defaultConf = new Configuration(); + final DragonConfiguration defaultConf = new DragonConfiguration(); defaultConf.addResource(DragonJobConfig.JOB_CONF_FILE); UserGroupInformation.setConfiguration(defaultConf); @@ -150,7 +150,9 @@ public Object run() throws Exception { defaultConf.set(DragonJobConfig.WORKING_DIR, workDir); } Path workPath = new Path(workDir); - ChildExecutor.run(defaultConf, amProxy, context); // run the task + ChildExecutor executor = ChildExecutorFactory.newExecutor( + context.getTaskType()); + executor.execute(defaultConf, amProxy, context); // run the task return null; } }); diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/MapChildExecutor.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/MapChildExecutor.java new file mode 100644 index 0000000..90d9383 --- /dev/null +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/MapChildExecutor.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.realtime.child; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem.Statistics; +import org.apache.hadoop.realtime.DragonJobConfig; +import org.apache.hadoop.realtime.conf.DragonConfiguration; +import org.apache.hadoop.realtime.event.Event; +import org.apache.hadoop.realtime.event.EventEmitter; +import org.apache.hadoop.realtime.event.EventProducer; +import org.apache.hadoop.realtime.mr.MapContext; +import org.apache.hadoop.realtime.mr.Mapper; +import org.apache.hadoop.realtime.protocol.DragonChildProtocol; +import org.apache.hadoop.realtime.records.ChildExecutionContext; +import org.apache.hadoop.util.ReflectionUtils; + +/** + */ +final class MapChildExecutor extends ChildExecutor { + + private static class DirectOutputCollector implements + EventEmitter { + private final EventEmitter out; + private final List fsStats; + // counters + + @SuppressWarnings("unchecked") + DirectOutputCollector(DragonConfiguration conf) throws IOException, + ClassNotFoundException, InterruptedException { + this.out = null; + String name = conf.get(DragonJobConfig.JOB_OUTPUT_DIR); + this.fsStats = + getFsStatistics(name == null ? null : new Path(name), conf); + } + + @Override + public void close() throws IOException { + if (out != null) { + out.close(); + } + } + + private long getOutputBytes(List stats) { + if (stats == null) + return 0; + long bytesWritten = 0; + for (Statistics stat : stats) { + bytesWritten = bytesWritten + stat.getBytesWritten(); + } + return bytesWritten; + } + + @Override + public boolean emitEvent(Event event) throws IOException, + InterruptedException { + return out.emitEvent(event); + } + + @Override + public boolean emitEvent(Event event, long timeout, TimeUnit unit) + throws IOException, InterruptedException { + return out.emitEvent(event, timeout, unit); + } + } + + @Override + @SuppressWarnings("unchecked") + protected void execute( + final Configuration conf, final DragonChildProtocol proxy, + final ChildExecutionContext context) throws IOException, + InterruptedException { + super.execute(conf, proxy, context); + + // make a mapper + Mapper mapper = + (Mapper) ReflectionUtils.newInstance( + conf.getClass(DragonJobConfig.JOB_MAP_CLASS, Mapper.class), conf); + // fetch the input sub-dirs + + // make the event producer + EventProducer input = null; + + // if reduce number = 0 , then output = DirectOutputCollector + // else output = OutputCollector + + // make a map context + MapContext mapContext = null; + + // initialize the event producer + // run the mapper + mapper.run(mapContext); + // close the event producer + + } + +} diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/ReduceChildExecutor.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/ReduceChildExecutor.java new file mode 100644 index 0000000..e018f86 --- /dev/null +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/child/ReduceChildExecutor.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.realtime.child; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.realtime.DragonJobConfig; +import org.apache.hadoop.realtime.event.EventProducer; +import org.apache.hadoop.realtime.mr.MapContext; +import org.apache.hadoop.realtime.mr.Mapper; +import org.apache.hadoop.realtime.protocol.DragonChildProtocol; +import org.apache.hadoop.realtime.records.ChildExecutionContext; +import org.apache.hadoop.util.ReflectionUtils; + +/** + */ +public class ReduceChildExecutor extends ChildExecutor { + + @Override + @SuppressWarnings("unchecked") + protected void execute( + final Configuration conf, final DragonChildProtocol proxy, + final ChildExecutionContext context) throws IOException, + InterruptedException { + super.execute(conf, proxy, context); + + // make a mapper + Mapper mapper = + (Mapper) ReflectionUtils.newInstance( + conf.getClass(DragonJobConfig.JOB_MAP_CLASS, Mapper.class), conf); + // fetch the input sub-dirs + + // make the event producer + EventProducer input = null; + + // if reduce number = 0 , then output = DirectOutputCollector + // else output = OutputCollector + + // make a map context + MapContext mapContext = null; + + // initialize the event producer + // run the mapper + mapper.run(mapContext); + // close the event producer + + } +} diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/DirectedAcyclicGraph.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/DirectedAcyclicGraph.java deleted file mode 100644 index 93efa31..0000000 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/DirectedAcyclicGraph.java +++ /dev/null @@ -1,634 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.realtime.dag; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.ConcurrentModificationException; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - -/** - *

- * DirectedAcyclicGraph implements a DAG that can be modified (vertices & - * edges added and removed), is guaranteed to remain acyclic, and provides fast - * topological order iteration. - *

- * - *

- * This class makes no claims to thread safety, and concurrent usage from - * multiple threads will produce undefined results. - *

- */ -public class DirectedAcyclicGraph implements Serializable { - private static final long serialVersionUID = -8352536912775689101L; - - /* vertex -> internal vertex */ - private Map> vertexMap = - new LinkedHashMap>(); - /* edge -> internal edge */ - private Map> edgeMap = - new LinkedHashMap>(); - - private transient Set unmodifiableEdgeSet = null; - private transient Set unmodifiableVertexSet = null; - - private EdgeFactory edgeFactory; - - private TopoComparator topoComparator; - - private TopoOrderMap topoOrderMap; - - private int maxTopoIndex = 0; - private int minTopoIndex = 0; - - // this update count is used to keep internal topological iterators honest - private long topologyUpdateCount = 0; - - protected DirectedAcyclicGraph() { - } - - /** - * @param edgeClass - */ - public DirectedAcyclicGraph(Class edgeClass) { - this(new EdgeFactory(edgeClass)); - } - - public DirectedAcyclicGraph(EdgeFactory ef) { - if (ef == null) { - throw new NullPointerException(); - } - edgeFactory = ef; - initialize(); - } - - /** - * set the topoOrderMap based on the current factory, and create the - * comparator; - */ - private void initialize() - { - topoOrderMap = TopoOrderMap.getInstance(); - topoComparator = new TopoComparator(topoOrderMap); - } - - /** - * Ensures that the specified vertex exists in this graph, or else throws - * exception. - * - * @param v vertex - * - * @return true if this assertion holds. - * - * @throws NullPointerException if specified vertex is null. - * @throws IllegalArgumentException if specified vertex does not exist in - * this graph. - */ - protected boolean assertVertexExist(V v) - { - if (containsVertex(v)) { - return true; - } else if (v == null) { - throw new NullPointerException(); - } else { - throw new IllegalArgumentException("no such vertex in graph"); - } - } - - /** - * A lazy build of edge container for specified vertex. - * - * @param vertex a vertex in this graph. - */ - private InternalVertex getInternalVertex(V vertex) { - assertVertexExist(vertex); - - InternalVertex ec = vertexMap.get(vertex); - - if (ec == null) { - ec = new InternalVertex(); - vertexMap.put(vertex, ec); - } - - return ec; - } - - public V getEdgeSource(E e) { - InternalEdge internalEdge = edgeMap.get(e); - // nullable? - return internalEdge.source; - } - - public V getEdgeTarget(E e) { - InternalEdge internalEdge = edgeMap.get(e); - // nullable? - return internalEdge.target; - } - - /** - * iterator will traverse the vertices in topological order, meaning that - * for a directed graph G = (V,E), if there exists a path from vertex va to - * vertex vb then va is guaranteed to come before vertex vb in the iteration - * order. - * - * @return an iterator that will traverse the graph in topological order - */ - public Iterator iterator() { - return new TopoIterator(); - } - - /** - * Adds the specified vertex to this graph if not already present, and puts it - * at the top of the internal topological vertex ordering. More formally, adds - * the specified vertex, v, to this graph if this graph contains - * no vertex u such that - * u.equals(v). If this graph already contains such vertex, the call - * leaves this graph unchanged and returns false. In combination with - * the restriction on constructors, this ensures that graphs never contain - * duplicate vertices. - * - * @param v vertex to be added to this graph. - * - * @return true if this graph did not already contain the specified - * vertex. - * - * @throws NullPointerException if the specified vertex is - * null. - */ - public boolean addVertex(V v) { - boolean added = addVertexInternal(v); - if (added) { - // add to the top - ++maxTopoIndex; - topoOrderMap.putVertex(maxTopoIndex, v); - - ++topologyUpdateCount; - } - - return added; - } - - protected boolean addVertexInternal(V v) { - if (v == null) { - throw new NullPointerException(); - } else if (containsVertex(v)) { - return false; - } else { - vertexMap.put(v, new InternalVertex()); - return true; - } - } - - /** - *

- * Adds the given edge and updates the internal topological order for - * consistency IFF - * - *

    - *
  • there is not already an edge (fromVertex, toVertex) in the graph - *
  • the edge does not induce a cycle in the graph - *
- *

- * - * @return null if the edge is already in the graph, else the created edge is - * returned - * - * @throws IllegalArgumentException If either fromVertex or toVertex is not a - * member of the graph - * @throws CycleFoundException if the edge would induce a cycle in the graph - */ - public E addEdge(V fromVertex, V toVertex) - throws CycleFoundException { - // we don't need to check the existence of those vertices here - addVertex(fromVertex); - addVertex(toVertex); - - Integer lb = topoOrderMap.getTopologicalIndex(toVertex); - Integer ub = topoOrderMap.getTopologicalIndex(fromVertex); - - if (lb < ub) { - Set df = new HashSet(); - Set db = new HashSet(); - - // Discovery - Region affectedRegion = new Region(lb, ub); - Visited visited = Visited.getInstance(affectedRegion); - - // throws CycleFoundException if there is a cycle - dfsF(toVertex, df, visited, affectedRegion); - - dfsB(fromVertex, db, visited, affectedRegion); - reorder(df, db, visited); - ++topologyUpdateCount; // if we do a reorder, than the topology has - // been updated - } - - return addEdgeInternal(fromVertex, toVertex); - } - - - protected E addEdgeInternal(V fromVertex, V toVertex) { - E e = edgeFactory.createEdge(fromVertex, toVertex); - if (containsEdge(e)) { // this restriction should stay! - return null; - } else { - InternalEdge internalEdge = new InternalEdge(fromVertex, toVertex); - edgeMap.put(e, internalEdge); - vertexMap.get(fromVertex).addOutgoingEdge(e); - vertexMap.get(toVertex).addIncomingEdge(e); - return e; - } - } - - public boolean containsEdge(E e) { - return edgeMap.containsKey(e); - } - - public boolean containsVertex(V v) { - return vertexMap.containsKey(v); - } - - public boolean removeEdge(E e) { - if (containsEdge(e)) { - InternalEdge ie = edgeMap.get(e); - vertexMap.get(ie.getSource()).removeOutgoingEdge(e); - vertexMap.get(ie.getTarget()).removeOutgoingEdge(e); - edgeMap.remove(e); - return true; - } else { - return false; - } - } - - /** - */ - public boolean removeAllEdges(Collection edges) { - boolean modified = false; - - for (E e : edges) { - modified |= removeEdge(e); - } - - return modified; - } - - /** - * Removes the specified vertex from this graph including all its touching - * edges if present. More formally, if the graph contains a vertex - * u such that u.equals(v), the call removes all edges - * that touch u and then removes u itself. If no - * such u is found, the call leaves the graph unchanged. - * Returns true if the graph contained the specified vertex. (The - * graph will not contain the specified vertex once the call returns). - * - *

If the specified vertex is null returns - * false.

- * - * @param v vertex to be removed from this graph, if present. - * - * @return true if the graph contained the specified vertex; - * false otherwise. - */ - public boolean removeVertex(V v) { - boolean removed = removeVertexInternal(v); - if (removed) { - Integer topoIndex = topoOrderMap.removeVertex(v); - - // contract minTopoIndex as we are able - if (topoIndex == minTopoIndex) { - while ((minTopoIndex < 0) - && (null == topoOrderMap.getVertex(minTopoIndex))) { - ++minTopoIndex; - } - } - - // contract maxTopoIndex as we are able - if (topoIndex == maxTopoIndex) { - while ((maxTopoIndex > 0) - && (null == topoOrderMap.getVertex(maxTopoIndex))) { - --maxTopoIndex; - } - } - - ++topologyUpdateCount; - } - - return removed; - } - - protected boolean removeVertexInternal(V v) { - if (containsVertex(v)) { - InternalVertex iv = vertexMap.get(v); - removeAllEdges(iv.getUnmodifiableIncomingEdges()); - removeAllEdges(iv.getUnmodifiableOutgoingEdges()); - vertexMap.remove(v); - return true; - } else { - return false; - } - } - - public boolean removeAllVertices(Collection vertices) { - boolean removed = removeAllVerticesInternal(vertices); - - topoOrderMap.removeAllVertices(); - - maxTopoIndex = 0; - minTopoIndex = 0; - - ++topologyUpdateCount; - - return removed; - } - - /** - */ - public boolean removeAllVerticesInternal(Collection vertices) { - boolean modified = false; - - for (V v : vertices) { - modified |= removeVertex(v); - } - - return modified; - } - - /** - * @see DirectedGraph#incomingEdges(Object) - */ - public Set incomingEdgesOf(V vertex) { - return getInternalVertex(vertex).getUnmodifiableIncomingEdges(); - } - - /** - * @see DirectedGraph#incomingEdges(Object) - */ - public Set outgoingEdgesOf(V vertex) { - return getInternalVertex(vertex).getUnmodifiableOutgoingEdges(); - } - - - public Set vertexSet() { - if (unmodifiableVertexSet == null) { - unmodifiableVertexSet = Collections.unmodifiableSet(vertexMap.keySet()); - } - - return unmodifiableVertexSet; - } - - public Set edgeSet() { - if (unmodifiableEdgeSet == null) { - unmodifiableEdgeSet = Collections.unmodifiableSet(edgeMap.keySet()); - } - - return unmodifiableEdgeSet; - } - - - /** - * Depth first search forward, building up the set (df) of forward-connected - * vertices in the Affected Region - * - * @param vertex the vertex being visited - * @param df the set we are populating with forward connected vertices in - * the Affected Region - * @param visited a simple data structure that lets us know if we already - * visited a node with a given topo index - * @param topoIndexMap for quick lookups, a map from vertex to topo index in - * the AR - * @param ub the topo index of the original fromVertex -- used for cycle - * detection - * - * @throws CycleFoundException if a cycle is discovered - */ - private void - dfsF(V vertex, Set df, Visited visited, Region affectedRegion) - throws CycleFoundException { - int topoIndex = topoOrderMap.getTopologicalIndex(vertex); - - // Assumption: vertex is in the AR and so it will be in visited - visited.setVisited(topoIndex); - - df.add(vertex); - - for (E outEdge : outgoingEdgesOf(vertex)) { - V nextVertex = getEdgeTarget(outEdge); - Integer nextVertexTopoIndex = - topoOrderMap.getTopologicalIndex(nextVertex); - - if (nextVertexTopoIndex.intValue() == affectedRegion.finish) { - // reset visited - try { - for (V visitedVertex : df) { - visited.clearVisited(topoOrderMap - .getTopologicalIndex(visitedVertex)); - } - } catch (UnsupportedOperationException e) { - // okay, fine, some implementations (ones that automatically - // clear themselves out) don't work this way - } - throw new CycleFoundException(); - } - - // note, order of checks is important as we need to make sure the - // vertex is in the affected region before we check its visited - // status (otherwise we will be causing an - // ArrayIndexOutOfBoundsException). - if (affectedRegion.isIn(nextVertexTopoIndex) - && !visited.getVisited(nextVertexTopoIndex)) { - dfsF(nextVertex, df, visited, affectedRegion); // recurse - } - } - } - - /** - * Depth first search backward, building up the set (db) of back-connected - * vertices in the Affected Region - * - * @param vertex the vertex being visited - * @param db the set we are populating with back-connected vertices in the AR - * @param visited - * @param topoIndexMap - */ - private void - dfsB(V vertex, Set db, Visited visited, Region affectedRegion) { - // Assumption: vertex is in the AR and so we will get a topoIndex from - // the map - int topoIndex = topoOrderMap.getTopologicalIndex(vertex); - visited.setVisited(topoIndex); - - db.add(vertex); - - for (E inEdge : incomingEdgesOf(vertex)) { - V previousVertex = getEdgeSource(inEdge); - Integer previousVertexTopoIndex = - topoOrderMap.getTopologicalIndex(previousVertex); - - // note, order of checks is important as we need to make sure the - // vertex is in the affected region before we check its visited - // status (otherwise we will be causing an - // ArrayIndexOutOfBoundsException). - if (affectedRegion.isIn(previousVertexTopoIndex) - && !visited.getVisited(previousVertexTopoIndex)) { - // if prevousVertexTopoIndex != null, the vertex is in the - // Affected Region according to our topoIndexMap - - dfsB(previousVertex, db, visited, affectedRegion); - } - } - } - - @SuppressWarnings("unchecked") - private void reorder(Set df, Set db, Visited visited) { - List topoDf = new ArrayList(df); - List topoDb = new ArrayList(db); - - Collections.sort(topoDf, topoComparator); - Collections.sort(topoDb, topoComparator); - - // merge these suckers together in topo order - - SortedSet availableTopoIndices = new TreeSet(); - - // we have to cast to the generic type, can't do "new V[size]" in java - // 5; - V[] bigL = (V[]) new Object[df.size() + db.size()]; - int lIndex = 0; // this index is used for the sole purpose of pushing - // into - - // the correct index of bigL - - // assume (for now) that we are resetting visited - boolean clearVisited = true; - - for (V vertex : topoDb) { - Integer topoIndex = topoOrderMap.getTopologicalIndex(vertex); - - // add the available indices to the set - availableTopoIndices.add(topoIndex); - - bigL[lIndex++] = vertex; - - if (clearVisited) { // reset visited status if supported - try { - visited.clearVisited(topoIndex); - } catch (UnsupportedOperationException e) { - clearVisited = false; - } - } - } - - for (V vertex : topoDf) { - Integer topoIndex = topoOrderMap.getTopologicalIndex(vertex); - - // add the available indices to the set - availableTopoIndices.add(topoIndex); - bigL[lIndex++] = vertex; - - if (clearVisited) { // reset visited status if supported - try { - visited.clearVisited(topoIndex); - } catch (UnsupportedOperationException e) { - clearVisited = false; - } - } - } - - lIndex = 0; // reusing lIndex - for (Integer topoIndex : availableTopoIndices) { - // assign the indexes to the elements of bigL in order - V vertex = bigL[lIndex++]; // note the post-increment - topoOrderMap.putVertex(topoIndex, vertex); - } - } - - /** - * iterator which follows topological order - */ - private class TopoIterator implements Iterator { - private int currentTopoIndex; - private final long updateCountAtCreation; - private Integer nextIndex = null; - - public TopoIterator() { - updateCountAtCreation = topologyUpdateCount; - currentTopoIndex = minTopoIndex - 1; - } - - public boolean hasNext() { - if (updateCountAtCreation != topologyUpdateCount) { - throw new ConcurrentModificationException(); - } - - nextIndex = getNextIndex(); - return nextIndex != null; - } - - public V next() { - if (updateCountAtCreation != topologyUpdateCount) { - throw new ConcurrentModificationException(); - } - - if (nextIndex == null) { - // find nextIndex - nextIndex = getNextIndex(); - } - if (nextIndex == null) { - throw new NoSuchElementException(); - } - currentTopoIndex = nextIndex; - nextIndex = null; - return topoOrderMap.getVertex(currentTopoIndex); // topoToVertex.get(currentTopoIndex); - } - - public void remove() { - if (updateCountAtCreation != topologyUpdateCount) { - throw new ConcurrentModificationException(); - } - - V vertexToRemove = null; - if (null != (vertexToRemove = topoOrderMap.getVertex(currentTopoIndex))) { - topoOrderMap.removeVertex(vertexToRemove); - } else { - // should only happen if next() hasn't been called - throw new IllegalStateException(); - } - } - - private Integer getNextIndex() { - for (int i = currentTopoIndex + 1; i <= maxTopoIndex; i++) { - if (null != topoOrderMap.getVertex(i)) { - return i; - } - } - return null; - } - } - -} diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/EdgeFactory.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/EdgeFactory.java deleted file mode 100644 index 6bed285..0000000 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/EdgeFactory.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.realtime.dag; - -import java.io.Serializable; - -/** - * An {@link EdgeFactory} for producing edges by using a class as a factory. - */ -public class EdgeFactory implements Serializable { - private static final long serialVersionUID = -7890706652865009651L; - - private final Class edgeClass; - - public EdgeFactory(Class edgeClass) { - this.edgeClass = edgeClass; - } - - /** - * @see EdgeFactory#createEdge(Object, Object) - */ - public E createEdge(V source, V target) { - try { - return edgeClass.newInstance(); - } catch (Exception ex) { - throw new RuntimeException("Edge factory failed", ex); - } - } -} \ No newline at end of file diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/InternalEdge.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/InternalEdge.java deleted file mode 100644 index db1c97e..0000000 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/InternalEdge.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.realtime.dag; - -import java.io.Serializable; - -/** - * An internal view of edges from a {@link DirectedAcyclicGraph} - */ -class InternalEdge implements Serializable { - private static final long serialVersionUID = 1917792937475311485L; - - V source; - V target; - - InternalEdge(V source, V target) { - this.source = source; - this.target = target; - } - - /** - * Get the source vertex of this edge. - * @return the source vertex of this edge. - */ - public V getSource() { - return source; - } - - /** - * Get the target vertex of this edge. - * @return the target vertex of this edge. - */ - public V getTarget() { - return target; - } - -} \ No newline at end of file diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/InternalVertex.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/InternalVertex.java deleted file mode 100644 index 18e1b36..0000000 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/InternalVertex.java +++ /dev/null @@ -1,64 +0,0 @@ -package org.apache.hadoop.realtime.dag; - -import java.io.Serializable; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - -/** - * An internal view of vertices. - */ -class InternalVertex implements Serializable { - private static final long serialVersionUID = 337763367843853900L; - - Set incoming; - Set outgoing; - private transient Set unmodifiableIncoming = null; - private transient Set unmodifiableOutgoing = null; - - InternalVertex() { - incoming = new HashSet(); - outgoing = new HashSet(); - } - - /** - * A lazy build of unmodifiable incoming edge set. - * - * @return - */ - public Set getUnmodifiableIncomingEdges() { - if (unmodifiableIncoming == null) { - unmodifiableIncoming = Collections.unmodifiableSet(incoming); - } - return unmodifiableIncoming; - } - - /** - * A lazy build of unmodifiable outgoing edge set. - * - * @return - */ - public Set getUnmodifiableOutgoingEdges() { - if (unmodifiableOutgoing == null) { - unmodifiableOutgoing = Collections.unmodifiableSet(outgoing); - } - - return unmodifiableOutgoing; - } - - public void addIncomingEdge(E e) { - incoming.add(e); - } - - public void addOutgoingEdge(E e) { - outgoing.add(e); - } - - public void removeIncomingEdge(E e) { - incoming.remove(e); - } - - public void removeOutgoingEdge(E e) { - outgoing.remove(e); - } -} \ No newline at end of file diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/TopoComparator.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/TopoComparator.java deleted file mode 100644 index 96f26e3..0000000 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/TopoComparator.java +++ /dev/null @@ -1,23 +0,0 @@ -package org.apache.hadoop.realtime.dag; - -import java.io.Serializable; -import java.util.Comparator; - -/** - * Note, this is a lazy and incomplete implementation, with assumptions that - * inputs are in the given topoIndexMap - */ -class TopoComparator implements Comparator, Serializable{ - private static final long serialVersionUID = 7091580997211005045L; - - private TopoOrderMap topoOrderMap; - - public TopoComparator(TopoOrderMap topoOrderMap) { - this.topoOrderMap = topoOrderMap; - } - - public int compare(V v1, V v2) { - return topoOrderMap.getTopologicalIndex(v1).compareTo( - topoOrderMap.getTopologicalIndex(v2)); - } -} \ No newline at end of file diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/TopoOrderMap.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/TopoOrderMap.java deleted file mode 100644 index b5a3f52..0000000 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/TopoOrderMap.java +++ /dev/null @@ -1,73 +0,0 @@ -package org.apache.hadoop.realtime.dag; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * For performance and flexibility uses an ArrayList for topological index to - * vertex mapping, and a HashMap for vertex to topological index mapping. - */ -class TopoOrderMap implements Serializable { - private static final long serialVersionUID = 7834322521006147372L; - - private final List topoToVertex = new ArrayList(); - private final Map vertexToTopo = - new HashMap(); - - public static TopoOrderMap getInstance() { - return new TopoOrderMap(); - } - - public void putVertex(Integer index, V vertex) { - int translatedIndex = translateIndex(index); - - // grow topoToVertex as needed to accommodate elements - while ((translatedIndex + 1) > topoToVertex.size()) { - topoToVertex.add(null); - } - - topoToVertex.set(translatedIndex, vertex); - vertexToTopo.put(vertex, index); - } - - public V getVertex(Integer index) { - return topoToVertex.get(translateIndex(index)); - } - - public Integer getTopologicalIndex(V vertex) { - return vertexToTopo.get(vertex); - } - - public Integer removeVertex(V vertex) { - Integer topoIndex = vertexToTopo.remove(vertex); - if (topoIndex != null) { - topoToVertex.set(translateIndex(topoIndex), null); - } - return topoIndex; - } - - public void removeAllVertices() { - vertexToTopo.clear(); - topoToVertex.clear(); - } - - /** - * We translate the topological index to an ArrayList index. We have to do - * this because topological indices can be negative, and we want to do it - * because we can make better use of space by only needing an ArrayList of - * size |AR|. - * - * @param unscaledIndex - * - * @return the ArrayList index - */ - private final int translateIndex(int index) { - if (index >= 0) { - return 2 * index; - } - return -1 * ((index * 2) - 1); - } -} \ No newline at end of file diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/Visited.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/Visited.java deleted file mode 100644 index 10fc9ef..0000000 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/Visited.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.realtime.dag; - -import java.util.ArrayList; -import java.util.List; - -/** - */ -class Visited { - private final List visited = new ArrayList(); - - private Region affectedRegion; - - protected Visited(Region affectedRegion) { - // Make sure visited is big enough - int minSize = (affectedRegion.finish - affectedRegion.start) + 1; - /* plus one because the region range is inclusive of both indices */ - - while (visited.size() < minSize) { - visited.add(Boolean.FALSE); - } - - this.affectedRegion = affectedRegion; - } - - public static Visited getInstance(Region affectedRegion) { - return new Visited(affectedRegion); - } - - public void setVisited(int index) { - visited.set(translateIndex(index), Boolean.TRUE); - } - - public boolean getVisited(int index) { - Boolean result = null; - - result = visited.get(translateIndex(index)); - - return result; - } - - public void clearVisited(int index) throws UnsupportedOperationException { - visited.set(translateIndex(index), Boolean.FALSE); - } - - /** - * We translate the topological index to an ArrayList index. We have to do - * this because topological indices can be negative, and we want to do it - * because we can make better use of space by only needing an ArrayList of - * size |AR|. - * - * @param unscaledIndex - * - * @return the ArrayList index - */ - private int translateIndex(int index) { - return index - affectedRegion.start; - } -} diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/DefaultEventProducer.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/DefaultEventProducer.java new file mode 100644 index 0000000..abf2263 --- /dev/null +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/DefaultEventProducer.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.realtime.event; + +import java.io.Closeable; +import java.io.IOException; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.realtime.fs.StandardWatchEventKinds; +import org.apache.hadoop.realtime.fs.WatchEvent; +import org.apache.hadoop.realtime.fs.WatchKey; +import org.apache.hadoop.realtime.fs.WatchService; +import org.apache.hadoop.realtime.fs.WatchServiceFactory; +import org.apache.hadoop.realtime.records.ChildExecutionContext; + +/** + */ +public class DefaultEventProducer implements + EventProducer, Closeable { + private static final Log LOG = LogFactory.getLog(DefaultEventProducer.class); + + private static final NumberFormat fileNameFormat; + static { + fileNameFormat = NumberFormat.getInstance(); + fileNameFormat.setMinimumIntegerDigits(20); + fileNameFormat.setMaximumFractionDigits(0); + fileNameFormat.setGroupingUsed(false); + } + private static final String FILE_SUFFIX = ".drg"; + /* 20 digitals + FILE_SUFFIX */ + private static final Pattern FILE_PATTERN = Pattern + .compile("\\d{20}\\" + FILE_SUFFIX); + + private volatile boolean running = true; + + private final Configuration conf; + private final FileSystem fileSystem; + private final Path parentDir; + + private FSDataInputStream in; + + private WatchService watchService; + + private final BlockingDeque fifoQueue = + new LinkedBlockingDeque(); + + private final ExecutorService singleExecutor = Executors + .newSingleThreadExecutor(); + + public DefaultEventProducer(final Configuration conf, final Path path, + final int bufferSize) throws IOException { + this.conf = conf; + this.fileSystem = FileSystem.get(conf); + this.parentDir = path; + startWatchService(); + } + + + public DefaultEventProducer(final Configuration conf, final Path path, + final int bufferSize, final long startOffset) throws IOException { + this.conf = conf; + this.fileSystem = FileSystem.get(conf); + this.parentDir = path; + initByOffset(fileSystem, parentDir, startOffset); + startWatchService(); + } + + @Override + public void initialize(ChildExecutionContext context) throws IOException, + InterruptedException { + // TODO Auto-generated method stub + + } + + /** + * take a event from the queue. + * + * @return the current key object or null if there isn't one + * @throws IOException + * @throws InterruptedException + */ + @Override + public Event pollEvent() throws IOException, + InterruptedException { + FSDataInputStream input = getInputStream(); + if (input == null) { + return null; + } + + return null; + } + + /** + * take a event from the queue. + * + * @return the current key object or null if there isn't one + * @throws IOException + * @throws InterruptedException + */ + @Override + public Event pollEvent(long timeout, TimeUnit unit) + throws IOException, InterruptedException { + return null; + } + + private void startWatchService() throws IOException { + this.watchService = WatchServiceFactory.newWatchService(this.conf); + this.watchService.register(this.parentDir, + StandardWatchEventKinds.ENTRY_CREATE); + this.singleExecutor.execute(new Runnable() { + public void run() { + // forever polling + while (DefaultEventProducer.this.running) { + WatchKey key; + try { + key = DefaultEventProducer.this.watchService.take(); + } catch (InterruptedException x) { + continue; + } + // break if the directory was removed + if (!DefaultEventProducer.this.processEvents(key)) { + break; + } + } + + try { + DefaultEventProducer.this.watchService.close(); + } catch (IOException ioe) { + // log here + } + } + }); + } + + static String nameFromOffset(final long offset) { + return fileNameFormat.format(offset) + FILE_SUFFIX; + } + + static long startOffsetFromName(final String name) { + return Long.parseLong(name.substring(0, + name.length() - FILE_SUFFIX.length())); + } + + private void initByOffset(final FileSystem fs, final Path parent, final long offset) + throws IOException { + FileStatus[] statuses = fs.listStatus(parent, new PathFilter() { + public boolean accept(Path path) { + return FILE_PATTERN.matcher(path.getName()).matches() + && (nameFromOffset(offset).compareTo(path.getName()) >= 0); + } + }); + + if (statuses.length == 0) { + LOG.info("There is no existing file with its start offset larger than " + + offset); + return; + } + + Arrays.sort(statuses, new Comparator() { + @Override + public int compare(final FileStatus lhs, final FileStatus rhs) { + return lhs.getPath().compareTo(rhs.getPath()); + } + }); + + // FIXME: must consider about the length of file header + FileStatus lastFile = statuses[statuses.length - 1]; + long lastFileOffset = startOffsetFromName(lastFile.getPath().getName()); + if (offset > (lastFileOffset + lastFile.getLen())) { + LOG.info("The file corresponding to the offset " + offset + + " has been deleted."); + return; + } + + for (FileStatus status : statuses) { + // add file into the queue only if it's newer than the tail of the queue + this.fifoQueue.offer(status.getPath()); + } + } + + private synchronized FSDataInputStream getInputStream() throws IOException { + if (this.in == null) { + Path path = this.fifoQueue.peek(); + if (path == null) { + return null; + } + this.in = this.fileSystem.open(path); + LOG.info("Opening file" + path.toUri().getPath()); + } + return this.in; + } + + @SuppressWarnings({ "unchecked" }) + private synchronized boolean processEvents(WatchKey key) { + List newList = new ArrayList(); + + for (WatchEvent event : key.pollEvents()) { + WatchEvent ev = (WatchEvent) event; + Path eventPath = ev.context(); + String realPath = eventPath.getName(); + if (ev.kind() == StandardWatchEventKinds.ENTRY_CREATE + || ev.kind() == StandardWatchEventKinds.ENTRY_MODIFY) { + if (DefaultEventProducer.FILE_PATTERN.matcher(realPath).matches()) { + newList.add(eventPath); + } + } + } + Collections.sort(newList); + Path tail = this.fifoQueue.peekLast(); + // add to the queue + for (Path path : newList) { + // add file into the queue only if it's newer than the tail of the queue + if (path.getName().endsWith(FILE_SUFFIX) + && (tail == null || path.compareTo(tail) > 0)) { + this.fifoQueue.offer(path); + } else { + LOG.info("ignore path " + path.toUri().getPath()); + } + } + return key.reset(); + } + + @Override + public void close() throws IOException { + this.running = false; + this.singleExecutor.shutdown(); + this.watchService.close(); + } + +} diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/Region.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/DirectEventEmitter.java similarity index 59% rename from hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/Region.java rename to hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/DirectEventEmitter.java index 320b061..f21201e 100644 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/Region.java +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/DirectEventEmitter.java @@ -15,30 +15,31 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.realtime.dag; +package org.apache.hadoop.realtime.event; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; /** - * Region is an *inclusive* range of indices. Esthetically displeasing, but - * convenient for our purposes. */ -public class Region { - - public final int start; - public final int finish; +public class DirectEventEmitter implements EventEmitter { - public Region(int start, int finish) { - if (start > finish) { - throw new IllegalArgumentException("(start > finish): invariant broken"); - } - this.start = start; - this.finish = finish; + @Override + public boolean emitEvent(Event event) throws IOException, + InterruptedException { + return false; } - public int getSize() { - return (finish - start) + 1; + @Override + public boolean + emitEvent(Event event, long timeout, TimeUnit unit) + throws IOException, InterruptedException { + return false; } - public boolean isIn(int index) { - return (index >= start) && (index <= finish); + @Override + public void close() throws IOException { + } -} \ No newline at end of file + +} diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventDispatcher.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventDispatcher.java deleted file mode 100644 index a45b760..0000000 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventDispatcher.java +++ /dev/null @@ -1,16 +0,0 @@ -package org.apache.hadoop.realtime.event; - - -public interface EventDispatcher { - /** - * Dispatch event using stream name. Partitioners may be used to partition - * the event, possibly based on a pre-determined set of fixed named keys. - * - * @param streamName - * name of stream to dispatch on - * @param key - * object to dispatch - */ - void dispatch(String streamName, KEY key, VALUE value); - -} diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventEmitter.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventEmitter.java new file mode 100644 index 0000000..80a60a0 --- /dev/null +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventEmitter.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.realtime.event; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + */ +public interface EventEmitter { + /** + * Generate an output event + */ + public boolean emitEvent(Event event) throws IOException, + InterruptedException; + + /** + * Generate an output event + */ + public boolean emitEvent(Event event, long timeout, + TimeUnit unit) throws IOException, InterruptedException; + + /** + * Close the event emitter. + */ + public void close() throws IOException; +} diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventProcessor.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventProcessor.java deleted file mode 100644 index 65fb505..0000000 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventProcessor.java +++ /dev/null @@ -1,30 +0,0 @@ -package org.apache.hadoop.realtime.event; - -import java.io.IOException; - -import org.apache.hadoop.realtime.job.TaskAttempt; - -public abstract class EventProcessor { - - public abstract class Context - implements TaskAttempt { - } - - /** - * Called once at the beginning of the task. - */ - protected void setup(Context context - ) throws IOException, InterruptedException { - // NOTHING - } - - protected abstract void execute(KEYIN key, VALUEIN value, Context context) throws IOException; - - /** - * Called once at the end of the task. - */ - protected void cleanup(Context context - ) throws IOException, InterruptedException { - // NOTHING - } -} diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventProducer.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventProducer.java index 7d6f6c2..7450208 100644 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventProducer.java +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventProducer.java @@ -2,8 +2,9 @@ import java.io.Closeable; import java.io.IOException; +import java.util.concurrent.TimeUnit; -import org.apache.hadoop.realtime.job.TaskAttempt; +import org.apache.hadoop.realtime.records.ChildExecutionContext; /** * @@ -17,7 +18,8 @@ public interface EventProducer extends Closeable { * @throws IOException * @throws InterruptedException */ - public void initialize(TaskAttempt context) throws IOException, InterruptedException; + public void initialize(ChildExecutionContext context) throws IOException, + InterruptedException; /** * Read the next event. @@ -26,20 +28,20 @@ public interface EventProducer extends Closeable { * @throws IOException * @throws InterruptedException */ - public boolean nextEvent() throws IOException, InterruptedException; + public Event pollEvent() throws IOException, InterruptedException; /** - * Get the current event. + * Read the next event. * - * @return the object that was read + * @return true if a key/value pair was read * @throws IOException * @throws InterruptedException */ - public Event getCurrentEvent() throws IOException, InterruptedException; - + public Event pollEvent(long timeout, TimeUnit unit) + throws IOException, InterruptedException; /** - * Close the record reader. + * Close the event producer. */ public void close() throws IOException; } diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventSink.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventUtils.java similarity index 89% rename from hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventSink.java rename to hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventUtils.java index e923458..3707204 100644 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventSink.java +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/EventUtils.java @@ -18,8 +18,7 @@ package org.apache.hadoop.realtime.event; /** - * Receives and persists the terminal {@link Event} */ -public interface EventSink { - public void sunk(); +public class EventUtils { + } diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/SequenceFileEventProduer.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/SequenceFileEventProduer.java deleted file mode 100644 index 7d0cfb7..0000000 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/event/SequenceFileEventProduer.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.realtime.event; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.realtime.job.TaskAttempt; - -/** - * An {@link EventProducer} that continuously produce {@link Event} - * from {@link SequenceFile}s by reading key, value pairs. - */ -public class SequenceFileEventProduer implements - EventProducer { - - private SequenceFile.Reader in; - protected Configuration conf; - - public SequenceFileEventProduer(Configuration conf, Path path) - throws IOException { - FileSystem fs = path.getFileSystem(conf); - - } - - public SequenceFileEventProduer(Configuration conf, Path path, long offset) - throws IOException { - - } - - @Override - public void initialize(TaskAttempt context) throws IOException, - InterruptedException { - - } - - @Override - public boolean nextEvent() throws IOException, InterruptedException { - return false; - } - - @Override - public Event getCurrentEvent() throws IOException, - InterruptedException { - return null; - } - - @Override - public void close() throws IOException { - } - -} diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/AbstractWatchService.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/AbstractWatchService.java index cc782a2..8539117 100644 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/AbstractWatchService.java +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/AbstractWatchService.java @@ -52,12 +52,6 @@ public void cancel() { protected AbstractWatchService() { } - /** - * Register the given object with this watch service - */ - abstract WatchKey register(Path path, WatchEvent.Kind... events) - throws IOException; - // used by AbstractWatchKey to enqueue key final void enqueueKey(WatchKey key) { pendingKeys.offer(key); diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/DefaultWatchService.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/DefaultWatchService.java index 9004593..d886320 100644 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/DefaultWatchService.java +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/DefaultWatchService.java @@ -71,7 +71,7 @@ public Thread newThread(Runnable r) { * Register the given file with this watch service */ @Override - WatchKey register(final Path path, WatchEvent.Kind... events) + public WatchKey register(final Path path, WatchEvent.Kind... events) throws IOException { // check events - CCE will be thrown if there are invalid elements if (events.length == 0) @@ -265,7 +265,7 @@ public void run() { } }; this.poller = - scheduledExecutor.scheduleAtFixedRate(thunk, period, period, + scheduledExecutor.scheduleAtFixedRate(thunk, 0, period, TimeUnit.MILLISECONDS); } } diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/WatchService.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/WatchService.java index 7777d2f..e0ef2c9 100644 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/WatchService.java +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/fs/WatchService.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.fs.Path; + /** * A watch service that watches registered objects for changes and * events. For example a file manager may use a watch service to monitor a @@ -119,6 +121,14 @@ public interface WatchService */ @Override void close() throws IOException; + + + /** + * Register the given object with this watch service + */ + WatchKey register(Path path, WatchEvent.Kind... events) + throws IOException; + /** * Retrieves and removes the next watch key, or {@code null} if none are diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/job/Mapper.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/job/Mapper.java deleted file mode 100644 index fd9c9bf..0000000 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/job/Mapper.java +++ /dev/null @@ -1,5 +0,0 @@ -package org.apache.hadoop.realtime.job; - -public class Mapper { - -} diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/job/Reducer.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/job/Reducer.java deleted file mode 100644 index 3cedc56..0000000 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/job/Reducer.java +++ /dev/null @@ -1,5 +0,0 @@ -package org.apache.hadoop.realtime.job; - -public class Reducer { - -} diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/MapContext.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/MapContext.java new file mode 100644 index 0000000..8784a63 --- /dev/null +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/MapContext.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.realtime.mr; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.realtime.child.Context; + +/** + * The context that is given to the {@link Mapper}. + * @param the key input type to the Mapper + * @param the value input type to the Mapper + * @param the key output type from the Mapper + * @param the value output type from the Mapper + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface MapContext + extends Context { + +} + \ No newline at end of file diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/MapContextImpl.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/MapContextImpl.java new file mode 100644 index 0000000..da01298 --- /dev/null +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/MapContextImpl.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.realtime.mr; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.realtime.conf.DragonConfiguration; +import org.apache.hadoop.realtime.event.Event; +import org.apache.hadoop.realtime.records.TaskAttemptId; +import org.apache.hadoop.realtime.records.TaskType; + +/** + * The context that is given to the {@link Mapper}. + * + * @param the key input type to the Mapper + * @param the value input type to the Mapper + * @param the key output type from the Mapper + * @param the value output type from the Mapper + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class MapContextImpl implements + MapContext { + + @Override + public DragonConfiguration getConfiguration() { + // TODO Auto-generated method stub + return null; + } + + @Override + public TaskAttemptId getTaskAttemptId() { + // TODO Auto-generated method stub + return null; + } + + @Override + public int getPartition() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public String getUser() { + // TODO Auto-generated method stub + return null; + } + + @Override + public TaskType getTaskType() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Event pollEvent() throws IOException, + InterruptedException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Event pollEvent(long timeout, TimeUnit unit) + throws IOException, InterruptedException { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean emitEvent(Event event) throws IOException, + InterruptedException { + return false; + } + + @Override + public boolean emitEvent(Event event, long timeout, + TimeUnit unit) throws IOException, InterruptedException { + // TODO Auto-generated method stub + return false; + } + +} diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/Mapper.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/Mapper.java new file mode 100644 index 0000000..f156641 --- /dev/null +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/Mapper.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.realtime.mr; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.realtime.event.Event; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public abstract class Mapper { + + /** + * Called once at the beginning of the task. + */ + protected void setup(MapContext context) + throws IOException, InterruptedException { + // NOTHING + } + + /** + * Called once for each event in the input. Most applications should override + * this, but the default is the identity function. + */ + @SuppressWarnings("unchecked") + protected void map(Event event, + MapContext context) throws IOException, + InterruptedException { + context.emitEvent((Event) event); + } + + /** + * Called once at the end of the task. + */ + protected void cleanup(MapContext context) + throws IOException, InterruptedException { + // NOTHING + } + + /** + * Expert users can override this method for more complete control over the + * execution of the Mapper. + * + * @param context + * @throws IOException + */ + public void run(MapContext context) + throws IOException, InterruptedException { + setup(context); + try { + while (true) { + Event evt = context.pollEvent(); + if (evt != null) + map(evt, context); + } + } catch (InterruptedException ie) { + cleanup(context); + throw ie; + } + } +} diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/Partitioner.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/Partitioner.java new file mode 100644 index 0000000..b748977 --- /dev/null +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/Partitioner.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.realtime.mr; + +import org.apache.hadoop.realtime.event.Event; + +/** + * Partitions the key space. + * + *

Partitioner controls the partitioning of the keys of the + * intermediate map-outputs. The key (or a subset of the key) is used to derive + * the partition, typically by a hash function. The total number of partitions + * is the same as the number of reduce tasks for the job. Hence this controls + * which of the m reduce tasks the intermediate key (and hence the + * record) is sent for reduction.

+ * + * @see Reducer + */ +public interface Partitioner{ + + /** + * Get the paritition number for a given key (hence record) given the total + * number of partitions i.e. number of reduce-tasks for the job. + * + *

Typically a hash function on a all or a subset of the key.

+ * + * @param key the key to be paritioned. + * @param value the entry value. + * @param numPartitions the total number of partitions. + * @return the partition number for the key. + */ + int getPartition(Event event, int numPartitions); +} diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/CycleFoundException.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/ReduceContext.java similarity index 68% rename from hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/CycleFoundException.java rename to hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/ReduceContext.java index eefaecf..7ff982e 100644 --- a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/dag/CycleFoundException.java +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/ReduceContext.java @@ -15,13 +15,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.hadoop.realtime.mr; -package org.apache.hadoop.realtime.dag; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.realtime.child.Context; /** - * Exception used on a {@link DirectedAcyclicGraph} when a cycle is found - * */ -@SuppressWarnings("serial") -public class CycleFoundException extends Exception { +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface ReduceContext extends + Context { + } diff --git a/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/Reducer.java b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/Reducer.java new file mode 100644 index 0000000..4eb05bc --- /dev/null +++ b/hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/mr/Reducer.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.realtime.mr; + +import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.realtime.child.Context; +import org.apache.hadoop.realtime.event.Event; + +/** + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public abstract class Reducer { + + /** + * Called once at the start of the task. + */ + protected void setup(ReduceContext context) + throws IOException, InterruptedException { + // NOTHING + } + + /** + * This method is called once for each key. Most applications will define + * their reduce class by overriding this method. The default implementation is + * an identity function. + */ + @SuppressWarnings("unchecked") + protected abstract void reduce(Event event, + ReduceContext context) + throws IOException, InterruptedException; + + /** + * Called once at the end of the task. + */ + protected void + cleanup(ReduceContext context) + throws IOException, InterruptedException { + // NOTHING + } + + /** + * Advanced application writers can use the + * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to control + * how the reduce task works. + */ + public void run(ReduceContext context) + throws IOException, InterruptedException { + setup(context); + try { + while (true) { + reduce(context.pollEvent(), context); + } + } catch (InterruptedException ie) { + cleanup(context); + throw ie; + } + } +}