diff --git a/src/main/2.7.0/org/apache/hadoop/util/CollectionGSetWrapper.java b/src/main/2.7.0/org/apache/hadoop/util/CollectionGSetWrapper.java new file mode 100644 index 00000000..e0e573df --- /dev/null +++ b/src/main/2.7.0/org/apache/hadoop/util/CollectionGSetWrapper.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.util; + +import com.googlecode.cqengine.IndexedCollection; +import java.util.Iterator; +import org.apache.hadoop.hdfs.server.namenode.INode; +import org.apache.hadoop.hdfs.server.namenode.INodeWithAdditionalFields; +import org.jetbrains.annotations.NotNull; + +public class CollectionGSetWrapper implements GSet { + + private final IndexedCollection wrapped; + private final GSet backing; + + public CollectionGSetWrapper( + IndexedCollection wrapped, GSet backing) { + this.wrapped = wrapped; + this.backing = backing; + } + + @Override + public int size() { + return backing.size(); + } + + @Override + public boolean contains(INode element) { + return backing.contains(element); + } + + @Override + public INodeWithAdditionalFields get(INode element) { + return backing.get(element); + } + + @Override + public INodeWithAdditionalFields put(INodeWithAdditionalFields element) { + if (backing.contains(element)) { + INodeWithAdditionalFields prev = backing.get(element); + wrapped.add(element); + return prev; + } else { + wrapped.add(element); + return null; + } + } + + @Override + public INodeWithAdditionalFields remove(INode element) { + if (backing.contains(element)) { + INodeWithAdditionalFields prev = backing.get(element); + wrapped.remove(element); + return prev; + } + return null; + } + + @Override + public void clear() { + wrapped.clear(); + } + + @NotNull + @Override + public Iterator iterator() { + return backing.iterator(); + } +} diff --git a/src/main/2.8.0/org/apache/hadoop/util/CollectionGSetWrapper.java b/src/main/2.8.0/org/apache/hadoop/util/CollectionGSetWrapper.java new file mode 100644 index 00000000..dd73ac2b --- /dev/null +++ b/src/main/2.8.0/org/apache/hadoop/util/CollectionGSetWrapper.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.util; + +import com.googlecode.cqengine.IndexedCollection; +import java.util.Collection; +import java.util.Iterator; +import org.apache.hadoop.hdfs.server.namenode.INode; +import org.apache.hadoop.hdfs.server.namenode.INodeWithAdditionalFields; +import org.jetbrains.annotations.NotNull; + +public class CollectionGSetWrapper implements GSet { + + private final IndexedCollection wrapped; + private final GSet backing; + + public CollectionGSetWrapper( + IndexedCollection wrapped, GSet backing) { + this.wrapped = wrapped; + this.backing = backing; + } + + @Override + public int size() { + return backing.size(); + } + + @Override + public boolean contains(INode element) { + return backing.contains(element); + } + + @Override + public INodeWithAdditionalFields get(INode element) { + return backing.get(element); + } + + @Override + public INodeWithAdditionalFields put(INodeWithAdditionalFields element) { + if (backing.contains(element)) { + INodeWithAdditionalFields prev = backing.get(element); + wrapped.add(element); + return prev; + } else { + wrapped.add(element); + return null; + } + } + + @Override + public INodeWithAdditionalFields remove(INode element) { + if (backing.contains(element)) { + INodeWithAdditionalFields prev = backing.get(element); + wrapped.remove(element); + return prev; + } + return null; + } + + @Override + public void clear() { + wrapped.clear(); + } + + @Override + public Collection values() { + return backing.values(); + } + + @NotNull + @Override + public Iterator iterator() { + return backing.iterator(); + } +} diff --git a/src/main/2.9.0/org/apache/hadoop/util/CollectionGSetWrapper.java b/src/main/2.9.0/org/apache/hadoop/util/CollectionGSetWrapper.java new file mode 100644 index 00000000..dd73ac2b --- /dev/null +++ b/src/main/2.9.0/org/apache/hadoop/util/CollectionGSetWrapper.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.util; + +import com.googlecode.cqengine.IndexedCollection; +import java.util.Collection; +import java.util.Iterator; +import org.apache.hadoop.hdfs.server.namenode.INode; +import org.apache.hadoop.hdfs.server.namenode.INodeWithAdditionalFields; +import org.jetbrains.annotations.NotNull; + +public class CollectionGSetWrapper implements GSet { + + private final IndexedCollection wrapped; + private final GSet backing; + + public CollectionGSetWrapper( + IndexedCollection wrapped, GSet backing) { + this.wrapped = wrapped; + this.backing = backing; + } + + @Override + public int size() { + return backing.size(); + } + + @Override + public boolean contains(INode element) { + return backing.contains(element); + } + + @Override + public INodeWithAdditionalFields get(INode element) { + return backing.get(element); + } + + @Override + public INodeWithAdditionalFields put(INodeWithAdditionalFields element) { + if (backing.contains(element)) { + INodeWithAdditionalFields prev = backing.get(element); + wrapped.add(element); + return prev; + } else { + wrapped.add(element); + return null; + } + } + + @Override + public INodeWithAdditionalFields remove(INode element) { + if (backing.contains(element)) { + INodeWithAdditionalFields prev = backing.get(element); + wrapped.remove(element); + return prev; + } + return null; + } + + @Override + public void clear() { + wrapped.clear(); + } + + @Override + public Collection values() { + return backing.values(); + } + + @NotNull + @Override + public Iterator iterator() { + return backing.iterator(); + } +} diff --git a/src/main/3.0.0/org/apache/hadoop/util/CollectionGSetWrapper.java b/src/main/3.0.0/org/apache/hadoop/util/CollectionGSetWrapper.java new file mode 100644 index 00000000..dd73ac2b --- /dev/null +++ b/src/main/3.0.0/org/apache/hadoop/util/CollectionGSetWrapper.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.util; + +import com.googlecode.cqengine.IndexedCollection; +import java.util.Collection; +import java.util.Iterator; +import org.apache.hadoop.hdfs.server.namenode.INode; +import org.apache.hadoop.hdfs.server.namenode.INodeWithAdditionalFields; +import org.jetbrains.annotations.NotNull; + +public class CollectionGSetWrapper implements GSet { + + private final IndexedCollection wrapped; + private final GSet backing; + + public CollectionGSetWrapper( + IndexedCollection wrapped, GSet backing) { + this.wrapped = wrapped; + this.backing = backing; + } + + @Override + public int size() { + return backing.size(); + } + + @Override + public boolean contains(INode element) { + return backing.contains(element); + } + + @Override + public INodeWithAdditionalFields get(INode element) { + return backing.get(element); + } + + @Override + public INodeWithAdditionalFields put(INodeWithAdditionalFields element) { + if (backing.contains(element)) { + INodeWithAdditionalFields prev = backing.get(element); + wrapped.add(element); + return prev; + } else { + wrapped.add(element); + return null; + } + } + + @Override + public INodeWithAdditionalFields remove(INode element) { + if (backing.contains(element)) { + INodeWithAdditionalFields prev = backing.get(element); + wrapped.remove(element); + return prev; + } + return null; + } + + @Override + public void clear() { + wrapped.clear(); + } + + @Override + public Collection values() { + return backing.values(); + } + + @NotNull + @Override + public Iterator iterator() { + return backing.iterator(); + } +} diff --git a/src/main/3.1.0/org/apache/hadoop/util/CollectionGSetWrapper.java b/src/main/3.1.0/org/apache/hadoop/util/CollectionGSetWrapper.java new file mode 100644 index 00000000..dd73ac2b --- /dev/null +++ b/src/main/3.1.0/org/apache/hadoop/util/CollectionGSetWrapper.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.util; + +import com.googlecode.cqengine.IndexedCollection; +import java.util.Collection; +import java.util.Iterator; +import org.apache.hadoop.hdfs.server.namenode.INode; +import org.apache.hadoop.hdfs.server.namenode.INodeWithAdditionalFields; +import org.jetbrains.annotations.NotNull; + +public class CollectionGSetWrapper implements GSet { + + private final IndexedCollection wrapped; + private final GSet backing; + + public CollectionGSetWrapper( + IndexedCollection wrapped, GSet backing) { + this.wrapped = wrapped; + this.backing = backing; + } + + @Override + public int size() { + return backing.size(); + } + + @Override + public boolean contains(INode element) { + return backing.contains(element); + } + + @Override + public INodeWithAdditionalFields get(INode element) { + return backing.get(element); + } + + @Override + public INodeWithAdditionalFields put(INodeWithAdditionalFields element) { + if (backing.contains(element)) { + INodeWithAdditionalFields prev = backing.get(element); + wrapped.add(element); + return prev; + } else { + wrapped.add(element); + return null; + } + } + + @Override + public INodeWithAdditionalFields remove(INode element) { + if (backing.contains(element)) { + INodeWithAdditionalFields prev = backing.get(element); + wrapped.remove(element); + return prev; + } + return null; + } + + @Override + public void clear() { + wrapped.clear(); + } + + @Override + public Collection values() { + return backing.values(); + } + + @NotNull + @Override + public Iterator iterator() { + return backing.iterator(); + } +} diff --git a/src/main/java/org/apache/hadoop/hdfs/server/namenode/AbstractQueryEngine.java b/src/main/java/org/apache/hadoop/hdfs/server/namenode/AbstractQueryEngine.java index e8e898cf..dc5eb629 100644 --- a/src/main/java/org/apache/hadoop/hdfs/server/namenode/AbstractQueryEngine.java +++ b/src/main/java/org/apache/hadoop/hdfs/server/namenode/AbstractQueryEngine.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.PrintWriter; +import java.lang.reflect.Field; import java.math.BigInteger; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -42,6 +43,7 @@ import java.util.stream.IntStream; import java.util.stream.LongStream; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import javax.servlet.http.HttpServletResponse; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.server.namenode.queries.FileTypeHistogram; @@ -50,22 +52,92 @@ import org.apache.hadoop.hdfs.server.namenode.queries.SpaceSizeHistogram; import org.apache.hadoop.hdfs.server.namenode.queries.TimeHistogram; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.CollectionsView; +import org.apache.hadoop.util.GSet; +import org.apache.hadoop.util.GSetSeperatorWrapper; import org.jetbrains.annotations.NotNull; public abstract class AbstractQueryEngine implements QueryEngine { - VersionInterface versionLoader; - NameNodeLoader nameNodeLoader; + protected Collection all; + protected Map files; + protected Map dirs; + + private VersionInterface versionLoader; @Override // QueryEngine - public void setContexts(NameNodeLoader nameNodeLoader, VersionInterface versionLoader) { - this.nameNodeLoader = nameNodeLoader; + public void setVersionContext(VersionInterface versionLoader) { this.versionLoader = versionLoader; } + @SuppressWarnings("unchecked") /* We do unchecked casting to extract GSets */ + @Override // QueryEngine + public void handleGSet(GSet preloaded, FSNamesystem namesystem) + throws Exception { + if (preloaded != null) { + filterINodes(preloaded); + return; + } + + namesystem.writeLock(); + try { + FSDirectory fsDirectory = namesystem.getFSDirectory(); + INodeMap inodeMap = fsDirectory.getINodeMap(); + Field mapField = inodeMap.getClass().getDeclaredField("map"); + mapField.setAccessible(true); + GSet gset = + (GSet) mapField.get(inodeMap); + + filterINodes(gset); + + GSet newGSet = new GSetSeperatorWrapper(files, dirs); + mapField.set(inodeMap, newGSet); + } finally { + namesystem.writeUnlock(); + } + } + + @SuppressWarnings("unchecked") /* We do unchecked casting to extract GSets */ + private void filterINodes(GSet gset) { + final long start = System.currentTimeMillis(); + files = + StreamSupport.stream(gset.spliterator(), true) + .filter(INode::isFile) + .collect(Collectors.toConcurrentMap(node -> node, node -> node)); + dirs = + StreamSupport.stream(gset.spliterator(), true) + .filter(INode::isDirectory) + .collect(Collectors.toConcurrentMap(node -> node, node -> node)); + all = CollectionsView.combine(files.keySet(), dirs.keySet()); + final long end = System.currentTimeMillis(); + LOG.info("Performing AbstractQE filtering of files and dirs took: {} ms.", (end - start)); + } + @Override // QueryEngine public Collection getINodeSet(String set) { - return nameNodeLoader.getINodeSetInternal(set); + long start = System.currentTimeMillis(); + Collection inodes; + switch (set) { + case "all": + inodes = all; + break; + case "files": + inodes = files.keySet(); + break; + case "dirs": + inodes = dirs.keySet(); + break; + default: + throw new IllegalArgumentException( + "You did not specify a set to use. Please check /sets for available sets."); + } + long end = System.currentTimeMillis(); + LOG.info( + "Fetching set of: {} had result size: {} and took: {} ms.", + set, + inodes.size(), + (end - start)); + return inodes; } /** @@ -1815,4 +1887,17 @@ private Function getTransformFunction( } return stdFunc; } + + @Override // QueryEngine + public void clear() { + if (all != null) { + all.clear(); + } + if (files != null) { + files.clear(); + } + if (dirs != null) { + dirs.clear(); + } + } } diff --git a/src/main/java/org/apache/hadoop/hdfs/server/namenode/JavaCollectionQEngine.java b/src/main/java/org/apache/hadoop/hdfs/server/namenode/JavaCollectionQEngine.java index 70114c42..4643f572 100644 --- a/src/main/java/org/apache/hadoop/hdfs/server/namenode/JavaCollectionQEngine.java +++ b/src/main/java/org/apache/hadoop/hdfs/server/namenode/JavaCollectionQEngine.java @@ -35,12 +35,17 @@ import com.googlecode.cqengine.IndexedCollection; import com.googlecode.cqengine.attribute.Attribute; import com.googlecode.cqengine.attribute.SimpleAttribute; +import com.googlecode.cqengine.index.hash.HashIndex; +import com.googlecode.cqengine.index.hash.HashIndex.CompactValueSetFactory; +import com.googlecode.cqengine.index.hash.HashIndex.DefaultIndexMapFactory; import com.googlecode.cqengine.persistence.wrapping.WrappingPersistence; +import com.googlecode.cqengine.quantizer.LongQuantizer; import com.googlecode.cqengine.query.Query; import com.googlecode.cqengine.query.parser.sql.SQLParser; import com.googlecode.cqengine.resultset.ResultSet; import java.io.IOException; import java.io.PrintWriter; +import java.lang.reflect.Field; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -55,10 +60,16 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.MultivaluedMap; +import org.apache.hadoop.util.CollectionGSetWrapper; +import org.apache.hadoop.util.GSet; +import org.apache.hadoop.util.GSetCollectionWrapper; import org.apache.http.HttpStatus; public class JavaCollectionQEngine extends AbstractQueryEngine { + private final SimpleAttribute isFile = attribute("isFile", INode::isFile); + private final SimpleAttribute isDir = attribute("isDir", INode::isDirectory); + private final SimpleAttribute id = attribute("id", node -> getFilterFunctionToLongForINode("id").apply(node)); private final SimpleAttribute accessTime = @@ -144,13 +155,9 @@ public class JavaCollectionQEngine extends AbstractQueryEngine { private SimpleAttribute dirSubTreeNumDirs; private SimpleAttribute storageType; - private IndexedCollection indexedFiles; - private IndexedCollection indexedDirs; - @Override // QueryEngine - public void setContexts(NameNodeLoader loader, VersionInterface versionLoader) { - this.nameNodeLoader = loader; - this.versionLoader = versionLoader; + public void setVersionContext(VersionInterface versionLoader) { + super.setVersionContext(versionLoader); dirNumChildren = attribute( @@ -173,35 +180,78 @@ public void setContexts(NameNodeLoader loader, VersionInterface versionLoader) { attribute( "storageType", node -> versionLoader.getFilterFunctionToLongForINode("storageType").apply(node)); + } - Collection files = loader.getINodeSetInternal("files"); - Collection dirs = loader.getINodeSetInternal("dirs"); + @SuppressWarnings("unchecked") /* We do unchecked casting to extract GSets */ + @Override // QueryEngine + public void handleGSet(GSet preloaded, FSNamesystem namesystem) + throws Exception { + if (preloaded != null) { + filterINodes(preloaded); + return; + } - indexedFiles = - new ConcurrentIndexedCollection<>( - WrappingPersistence.aroundCollectionOnPrimaryKey(files, id)); + namesystem.writeLock(); + try { + FSDirectory fsDirectory = namesystem.getFSDirectory(); + INodeMap inodeMap = fsDirectory.getINodeMap(); + Field mapField = inodeMap.getClass().getDeclaredField("map"); + mapField.setAccessible(true); + GSet gset = + (GSet) mapField.get(inodeMap); + + CollectionGSetWrapper newGSet = filterINodes(gset); + mapField.set(inodeMap, newGSet); + } finally { + namesystem.writeUnlock(); + } + } - indexedDirs = + private CollectionGSetWrapper filterINodes(GSet gset) { + final long start = System.currentTimeMillis(); + GSetCollectionWrapper gcw = new GSetCollectionWrapper(gset); + all = new ConcurrentIndexedCollection<>( - WrappingPersistence.aroundCollectionOnPrimaryKey(dirs, id)); + WrappingPersistence.aroundCollectionOnPrimaryKey(gcw, id)); + ((IndexedCollection) all) + .addIndex(HashIndex.withQuantizerOnAttribute(LongQuantizer.withCompressionFactor(10), id)); + ((IndexedCollection) all) + .addIndex( + HashIndex.onAttribute( + new DefaultIndexMapFactory<>(), new CompactValueSetFactory<>(), isFile)); + ((IndexedCollection) all) + .addIndex( + HashIndex.onAttribute( + new DefaultIndexMapFactory<>(), new CompactValueSetFactory<>(), isDir)); + final long end = System.currentTimeMillis(); + LOG.info("Performing JC-QE filtering of files and dirs took: {} ms.", (end - start)); + return new CollectionGSetWrapper((IndexedCollection) all, gset); } + @SuppressWarnings("unchecked") /* We do unchecked casting to extract GSets */ @Override // QueryEngine public Collection getINodeSet(String set) { long start = System.currentTimeMillis(); Collection inodes; switch (set) { case "all": - inodes = - new ConcurrentIndexedCollection<>( - WrappingPersistence.aroundCollectionOnPrimaryKey( - nameNodeLoader.getINodeSetInternal("all"), id)); + inodes = all; break; case "files": - inodes = indexedFiles; + inodes = + (Collection) + ((IndexedCollection) all) + .retrieve(equal(isFile, Boolean.TRUE)) + .stream() + .collect(Collectors.toSet()); break; case "dirs": - inodes = indexedDirs; + inodes = + (Collection) + ((IndexedCollection) all) + .retrieve(equal(isDir, Boolean.TRUE)) + .stream() + .collect(Collectors.toSet()); break; default: throw new IllegalArgumentException( @@ -209,7 +259,7 @@ public Collection getINodeSet(String set) { } long end = System.currentTimeMillis(); LOG.info( - "Fetching indexed set of: {} had result size: {} and took: {} ms.", + "Fetching set of: {} (by index) had result size: {} and took: {} ms.", set, inodes.size(), (end - start)); @@ -588,6 +638,7 @@ public HttpServletResponse sql( attributes.put("hasQuota", hasQuota); SQLParser parser = SQLParser.forPojoWithAttributes(INode.class, attributes); + IndexedCollection inodes; long count = 0; try (PrintWriter out = res.getWriter()) { @@ -599,7 +650,20 @@ public HttpServletResponse sql( } else { sql = formData.getFirst("sqlStatement"); } - ResultSet results = parser.retrieve(indexedFiles, sql); + + if (sql.contains("FILES")) { + inodes = + new ConcurrentIndexedCollection<>( + WrappingPersistence.aroundCollectionOnPrimaryKey(getINodeSet("files"), id)); + } else if (sql.contains("DIRS")) { + inodes = + new ConcurrentIndexedCollection<>( + WrappingPersistence.aroundCollectionOnPrimaryKey(getINodeSet("dirs"), id)); + } else { + throw new IllegalArgumentException("SQL must specify either selection from FILES, DIRS."); + } + + ResultSet results = parser.retrieve(inodes, sql); for (INode inode : results) { out.println(inode.getFullPathName()); count++; diff --git a/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLoader.java b/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLoader.java index 7c8a4eb6..1fee94e1 100644 --- a/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLoader.java +++ b/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLoader.java @@ -22,18 +22,14 @@ import java.io.IOException; import java.io.PrintWriter; import java.io.RandomAccessFile; -import java.lang.reflect.Field; import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; import java.sql.SQLException; import java.util.Collection; -import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import javax.servlet.http.HttpServletResponse; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; @@ -55,9 +51,7 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.delegation.TokenExtractor; -import org.apache.hadoop.util.CollectionsView; import org.apache.hadoop.util.GSet; -import org.apache.hadoop.util.GSetSeperatorWrapper; import org.apache.hadoop.util.ReflectionUtils; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonGenerator; @@ -77,9 +71,6 @@ public class NameNodeLoader { private Configuration conf = null; private FSNamesystem namesystem = null; private HsqlDriver hsqlDriver = null; - private Collection all = null; - private Map files = null; - private Map dirs = null; private TokenExtractor tokenExtractor = null; /** Constructor. */ @@ -321,8 +312,7 @@ public void load( GSet preloadedInodes, Configuration preloadedHadoopConf, ApplicationConfiguration nnaConf) - throws IOException, NoSuchFieldException, IllegalAccessException, URISyntaxException, - ClassNotFoundException { + throws Exception { /* * Configuration standard is: /etc/hadoop/conf. * Goal is to let configuration tell us where the FsImage and EditLogs are for loading. @@ -342,7 +332,6 @@ public void load( handleConfigurationOverrides(conf, nnaConf); final long start = System.currentTimeMillis(); - GSet gsetMap; if (preloadedInodes == null) { UserGroupInformation.setConfiguration(conf); reloadKeytab(); @@ -356,56 +345,31 @@ public void load( namesystem = FSNamesystem.loadFromDisk(conf); namesystem.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER); } catch (IOException e) { - LOG.info("Failed to load namesystem: {}", e); + LOG.error("Failed to load namesystem.", e); return; } long end1 = System.currentTimeMillis(); LOG.info("FSImage loaded in: {} ms.", (end1 - start1)); LOG.info("Loaded in {} Inodes", namesystem.getFilesTotal()); - namesystem.writeLock(); tokenExtractor = new TokenExtractor(namesystem.dtSecretManager, namesystem); - FSDirectory fsDirectory = namesystem.getFSDirectory(); - INodeMap inodeMap = fsDirectory.getINodeMap(); - Field mapField = inodeMap.getClass().getDeclaredField("map"); - mapField.setAccessible(true); - gsetMap = (GSet) mapField.get(inodeMap); } else { - gsetMap = preloadedInodes; tokenExtractor = new TokenExtractor(null, null); } - final long s1 = System.currentTimeMillis(); - files = - StreamSupport.stream(gsetMap.spliterator(), true) - .filter(INode::isFile) - .collect(Collectors.toConcurrentMap(node -> node, node -> node)); - dirs = - StreamSupport.stream(gsetMap.spliterator(), true) - .filter(INode::isDirectory) - .collect(Collectors.toConcurrentMap(node -> node, node -> node)); - all = CollectionsView.combine(files.keySet(), dirs.keySet()); - long e1 = System.currentTimeMillis(); - LOG.info("Filtering {} files and {} dirs took: {} ms.", files.size(), dirs.size(), (e1 - s1)); + // Let QueryEngine deal with inode set from here. + queryEngine.handleGSet(preloadedInodes, namesystem); if (preloadedInodes == null) { // Start tailing and updating security credentials threads. try { - FSDirectory fsDirectory = namesystem.getFSDirectory(); - INodeMap inodeMap = fsDirectory.getINodeMap(); - Field mapField = inodeMap.getClass().getDeclaredField("map"); - mapField.setAccessible(true); - GSet newGSet = new GSetSeperatorWrapper(files, dirs); - mapField.set(inodeMap, newGSet); - namesystem.writeUnlock(); - namesystem.startStandbyServices(conf); versionLoader.setNamesystem(namesystem); } catch (Throwable e) { - LOG.info("ERROR: Failed to start EditLogTailer: {}", e); + LOG.error("Failed to start EditLogTailer.", e); } } - queryEngine.setContexts(this, versionLoader); + queryEngine.setVersionContext(versionLoader); long end = System.currentTimeMillis(); LOG.info("NameNodeLoader bootstrap'd in: {} ms.", (end - start)); @@ -511,15 +475,7 @@ public void clear() { namesystem = null; } } - if (all != null) { - all.clear(); - } - if (files != null) { - files.clear(); - } - if (dirs != null) { - dirs.clear(); - } + queryEngine.clear(); inited.set(false); } @@ -557,32 +513,6 @@ public Collection getINodeSet(String set) { return queryEngine.getINodeSet(set); } - Collection getINodeSetInternal(String set) { - long start = System.currentTimeMillis(); - Collection inodes; - switch (set) { - case "all": - inodes = all; - break; - case "files": - inodes = files.keySet(); - break; - case "dirs": - inodes = dirs.keySet(); - break; - default: - throw new IllegalArgumentException( - "You did not specify a set to use. Please check /sets for available sets."); - } - long end = System.currentTimeMillis(); - LOG.info( - "Fetching set of: {} had result size: {} and took: {} ms.", - set, - inodes.size(), - (end - start)); - return inodes; - } - /** * Initializes the background thread that performs cached reporting for all users. Initializes the * background thread that refreshes Kerberos keytab for NNA process. @@ -598,15 +528,15 @@ public void initReloadThreads(ExecutorService internalService, ApplicationConfig try { suggestionsEngine.reloadSuggestions(this); } catch (Throwable e) { - LOG.info("Suggestion reload failed: {}", e); + LOG.info("Suggestion reload failed!", e); for (StackTraceElement element : e.getStackTrace()) { LOG.info(element.toString()); } } try { Thread.sleep(conf.getSuggestionsReloadSleepMs()); - } catch (InterruptedException ignored) { - LOG.debug("Suggestion reload was interrupted by: {}", ignored); + } catch (InterruptedException ex) { + LOG.debug("Suggestion reload was interrupted.", ex); } } }); @@ -617,8 +547,8 @@ public void initReloadThreads(ExecutorService internalService, ApplicationConfig // Reload Keytab every 10 minutes. try { Thread.sleep(10 * 60 * 1000L); - } catch (InterruptedException ignored) { - LOG.debug("Keytab refresh was interrupted by: {}", ignored); + } catch (InterruptedException ex) { + LOG.debug("Keytab refresh was interrupted.", ex); } reloadKeytab(); } diff --git a/src/main/java/org/apache/hadoop/hdfs/server/namenode/QueryEngine.java b/src/main/java/org/apache/hadoop/hdfs/server/namenode/QueryEngine.java index cfa17e38..e848f29d 100644 --- a/src/main/java/org/apache/hadoop/hdfs/server/namenode/QueryEngine.java +++ b/src/main/java/org/apache/hadoop/hdfs/server/namenode/QueryEngine.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.function.Function; import javax.servlet.http.HttpServletResponse; +import org.apache.hadoop.util.GSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +33,10 @@ public interface QueryEngine { Logger LOG = LoggerFactory.getLogger(QueryEngine.class.getName()); - void setContexts(NameNodeLoader nameNodeLoader, VersionInterface versionLoader); + void setVersionContext(VersionInterface versionLoader); + + void handleGSet(GSet gset, FSNamesystem namesystem) + throws Exception; Collection getINodeSet(String set); @@ -117,4 +121,6 @@ Map> removeKeysOnConditional2( List> createComparisons(String conditionsStr); boolean check(List> comparisons, long value); + + void clear(); } diff --git a/src/main/java/org/apache/hadoop/util/CollectionsView.java b/src/main/java/org/apache/hadoop/util/CollectionsView.java index 17415016..b5c4a4e9 100644 --- a/src/main/java/org/apache/hadoop/util/CollectionsView.java +++ b/src/main/java/org/apache/hadoop/util/CollectionsView.java @@ -22,6 +22,7 @@ import com.google.common.collect.Iterables; import java.util.Collection; import java.util.Iterator; +import org.jetbrains.annotations.NotNull; public final class CollectionsView { @@ -29,7 +30,7 @@ static class JoinedCollectionView implements Collection { private final Collection[] items; - public JoinedCollectionView(final Collection[] items) { + JoinedCollectionView(final Collection[] items) { this.items = items; } @@ -60,6 +61,7 @@ public boolean isEmpty() { return !iterator().hasNext(); } + @NotNull @Override public Iterator iterator() { return Iterables.concat(items).iterator(); @@ -89,11 +91,13 @@ public int size() { return ct; } + @NotNull @Override public Object[] toArray() { throw new UnsupportedOperationException(); } + @NotNull @Override public T[] toArray(T[] a) { throw new UnsupportedOperationException(); @@ -114,6 +118,7 @@ public boolean add(E e) { * *

None of the above methods is thread safe (nor would there be an easy way of making them). */ + @SuppressWarnings("unchecked") /* We do unchecked casting to extract GSets */ public static Collection combine(final Collection... items) { return new JoinedCollectionView<>(items); } diff --git a/src/main/java/org/apache/hadoop/util/GSetCollectionWrapper.java b/src/main/java/org/apache/hadoop/util/GSetCollectionWrapper.java new file mode 100644 index 00000000..a2216ea8 --- /dev/null +++ b/src/main/java/org/apache/hadoop/util/GSetCollectionWrapper.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.util; + +import java.util.AbstractCollection; +import java.util.Iterator; +import org.apache.hadoop.hdfs.server.namenode.INode; +import org.apache.hadoop.hdfs.server.namenode.INodeWithAdditionalFields; +import org.jetbrains.annotations.NotNull; + +public class GSetCollectionWrapper extends AbstractCollection { + + private final GSet gset; + + public GSetCollectionWrapper(GSet gset) { + this.gset = gset; + } + + @Override // AbstractCollection + public int size() { + return gset.size(); + } + + @Override // AbstractCollection + public boolean contains(Object o) { + if (o instanceof INode) { + return gset.contains((INode) o); + } + return false; + } + + @SuppressWarnings("unchecked") + @NotNull + @Override // AbstractCollection + public Iterator iterator() { + Iterator iterator = gset.iterator(); + return (Iterator) iterator; + } + + @Override // AbstractCollection + public boolean add(INode element) { + if (element instanceof INodeWithAdditionalFields) { + gset.put((INodeWithAdditionalFields) element); + return true; + } else { + return false; + } + } + + @Override // AbstractCollection + public boolean remove(Object o) { + if (o instanceof INode) { + INodeWithAdditionalFields removed = gset.remove((INode) o); + return (removed != null); + } + return false; + } + + @Override // AbstractCollection + public void clear() { + gset.clear(); + } + + public GSet getBackingSet() { + return gset; + } +} diff --git a/src/test/java/org/apache/hadoop/hdfs/server/namenode/analytics/TestNNAnalyticsBase.java b/src/test/java/org/apache/hadoop/hdfs/server/namenode/analytics/TestNNAnalyticsBase.java index 17febba3..0e74f3cd 100644 --- a/src/test/java/org/apache/hadoop/hdfs/server/namenode/analytics/TestNNAnalyticsBase.java +++ b/src/test/java/org/apache/hadoop/hdfs/server/namenode/analytics/TestNNAnalyticsBase.java @@ -23,6 +23,7 @@ import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.core.Is.is; import static org.hamcrest.core.IsNot.not; import static org.hamcrest.core.StringContains.containsString; @@ -504,7 +505,7 @@ public void testFindMinAccessTimeHistogramCSV() throws IOException, ParseExcepti } @Test - public void testFindMinAccessTimeHistogramRawTimestampCSV() throws IOException, ParseException { + public void testFindMinAccessTimeHistogramRawTimestampCSV() throws IOException { HttpGet get = new HttpGet( "http://localhost:4567/histogram?set=files&type=user&find=min:accessTime&histogramOutput=csv&rawTimestamps=true"); @@ -1236,15 +1237,31 @@ public void testHistogramTypeAndFindQuery() throws IOException { } @Test - public void testSQL() throws IOException { + public void testSQL1() throws IOException { HttpPost post = new HttpPost("http://localhost:4567/sql"); List postParams = new ArrayList<>(); - postParams.add(new BasicNameValuePair("sqlStatement", "SELECT * FROM files")); + postParams.add( + new BasicNameValuePair("sqlStatement", "SELECT * FROM FILES WHERE fileSize = 0")); post.setEntity(new UrlEncodedFormEntity(postParams, "UTF-8")); HttpResponse res = client.execute(hostPort, post); if (res.getStatusLine().getStatusCode() != HttpStatus.SC_NOT_FOUND) { List text = IOUtils.readLines(res.getEntity().getContent()); - assertThat(text.size(), is(555000)); + assertThat(text.size(), is(greaterThan(100))); + assertThat(res.getStatusLine().getStatusCode(), is(HttpStatus.SC_OK)); + } + } + + @Test + public void testSQL2() throws IOException { + HttpPost post = new HttpPost("http://localhost:4567/sql"); + List postParams = new ArrayList<>(); + postParams.add( + new BasicNameValuePair("sqlStatement", "SELECT * FROM DIRS WHERE dirNumChildren = 0")); + post.setEntity(new UrlEncodedFormEntity(postParams, "UTF-8")); + HttpResponse res = client.execute(hostPort, post); + if (res.getStatusLine().getStatusCode() != HttpStatus.SC_NOT_FOUND) { + List text = IOUtils.readLines(res.getEntity().getContent()); + assertThat(text.size(), is(0)); assertThat(res.getStatusLine().getStatusCode(), is(HttpStatus.SC_OK)); } } diff --git a/src/test/java/org/apache/hadoop/hdfs/server/namenode/analytics/TestWithMiniClusterBase.java b/src/test/java/org/apache/hadoop/hdfs/server/namenode/analytics/TestWithMiniClusterBase.java index 4a191276..46ffe223 100644 --- a/src/test/java/org/apache/hadoop/hdfs/server/namenode/analytics/TestWithMiniClusterBase.java +++ b/src/test/java/org/apache/hadoop/hdfs/server/namenode/analytics/TestWithMiniClusterBase.java @@ -78,7 +78,7 @@ public static void tearDown() throws IOException { } @Before - public void before() throws IOException { + public void before() { client = new DefaultHttpClient(); boolean isServingQueries = false; @@ -192,7 +192,7 @@ private void executeSaveNamespace(HttpGet get) throws IOException { } @Test - public void testTokenExtractor() throws IOException { + public void testTokenExtractor() { Map tokenLastLogins = nna.getLoader().getTokenExtractor().getTokenLastLogins(); assertThat(tokenLastLogins.size(), is(0)); } @@ -211,7 +211,7 @@ public void testUpdateSeen() throws Exception { // Ensure NNA sees those updates in query. int checkCount; do { - HttpGet check = new HttpGet("http://localhost:4567/filter?set=dirs&sum=count"); + HttpGet check = new HttpGet("http://localhost:4567/filter?set=files&sum=count"); HttpResponse checkRes = client.execute(hostPort, check); assertThat(checkRes.getStatusLine().getStatusCode(), is(200)); String checkContent = IOUtils.toString(checkRes.getEntity().getContent());