diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index 411af753cfe..f6ff8554258 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -97,6 +97,7 @@ public class Constants { public static final String BULK_PREFIX = "b-"; public static final String BULK_RENAME_FILE = "renames.json"; public static final String BULK_LOAD_MAPPING = "loadmap.json"; + public static final String BULK_WORKING_PREFIX = "accumulo-bulk-"; public static final String CLONE_PREFIX = "c-"; public static final byte[] CLONE_PREFIX_BYTES = CLONE_PREFIX.getBytes(UTF_8); diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java new file mode 100644 index 00000000000..511e3fed51c --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.client.rfile; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.hadoop.io.Text; + +import com.google.common.base.Preconditions; + +class LoadPlanCollector { + + private final LoadPlan.SplitResolver splitResolver; + private boolean finished = false; + private Text lgFirstRow; + private Text lgLastRow; + private Text firstRow; + private Text lastRow; + private Set overlappingExtents; + private KeyExtent currentExtent; + private long appended = 0; + + LoadPlanCollector(LoadPlan.SplitResolver splitResolver) { + this.splitResolver = splitResolver; + this.overlappingExtents = new HashSet<>(); + } + + LoadPlanCollector() { + splitResolver = null; + this.overlappingExtents = null; + + } + + private void appendNoSplits(Key key) { + if (lgFirstRow == null) { + lgFirstRow = key.getRow(); + lgLastRow = lgFirstRow; + } else { + var row = key.getRow(); + lgLastRow = row; + } + } + + private static final TableId FAKE_ID = TableId.of("123"); + + private void appendSplits(Key key) { + var row = key.getRow(); + if (currentExtent == null || !currentExtent.contains(row)) { + var tableSplits = splitResolver.apply(row); + var extent = new KeyExtent(FAKE_ID, tableSplits.getEndRow(), tableSplits.getPrevRow()); + Preconditions.checkState(extent.contains(row), "%s does not contain %s", tableSplits, row); + if (currentExtent != null) { + overlappingExtents.add(currentExtent); + } + currentExtent = extent; + } + } + + public void append(Key key) { + if (splitResolver == null) { + appendNoSplits(key); + } else { + appendSplits(key); + } + appended++; + } + + public void startLocalityGroup() { + if (lgFirstRow != null) { + if (firstRow == null) { + firstRow = lgFirstRow; + lastRow = lgLastRow; + } else { + // take the minimum + firstRow = firstRow.compareTo(lgFirstRow) < 0 ? firstRow : lgFirstRow; + // take the maximum + lastRow = lastRow.compareTo(lgLastRow) > 0 ? lastRow : lgLastRow; + } + lgFirstRow = null; + lgLastRow = null; + } + } + + public LoadPlan getLoadPlan(String filename) { + Preconditions.checkState(finished, "Attempted to get load plan before closing"); + + if (appended == 0) { + return LoadPlan.builder().build(); + } + + if (splitResolver == null) { + return LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, firstRow, lastRow) + .build(); + } else { + var builder = LoadPlan.builder(); + overlappingExtents.add(currentExtent); + for (var extent : overlappingExtents) { + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, extent.prevEndRow(), + extent.endRow()); + } + return builder.build(); + } + } + + public void close() { + finished = true; + // compute the overall min and max rows + startLocalityGroup(); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java index 64956dcbc20..8f96643143b 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java @@ -35,6 +35,7 @@ import org.apache.accumulo.core.client.summary.Summary.FileStatistics; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.RowRangeUtil; @@ -463,6 +464,15 @@ default WriterOptions withSummarizers(SummarizerConfiguration... summarizerConf) */ WriterOptions withVisibilityCacheSize(int maxSize); + /** + * @param splitResolver builds a {@link LoadPlan} using table split points provided by the given + * splitResolver. + * @return this + * @see RFileWriter#getLoadPlan(String) + * @since 3.1.0 + */ + WriterOptions withSplitResolver(LoadPlan.SplitResolver splitResolver); + /** * @return a new RfileWriter created with the options previously specified. */ diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java index b4d6def4a23..e337d2c5b3d 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java @@ -29,6 +29,7 @@ import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.security.ColumnVisibility; @@ -92,12 +93,17 @@ public class RFileWriter implements AutoCloseable { private final FileSKVWriter writer; private final LRUMap validVisibilities; + + // TODO should be able to completely remove this as lower level code is already doing some things + // like tracking first and last keys per LG. Added to get simple initial impl before optimizing. + private final LoadPlanCollector loadPlanCollector; private boolean startedLG; private boolean startedDefaultLG; - RFileWriter(FileSKVWriter fileSKVWriter, int visCacheSize) { + RFileWriter(FileSKVWriter fileSKVWriter, int visCacheSize, LoadPlanCollector loadPlanCollector) { this.writer = fileSKVWriter; this.validVisibilities = new LRUMap<>(visCacheSize); + this.loadPlanCollector = loadPlanCollector; } private void _startNewLocalityGroup(String name, Set columnFamilies) @@ -106,6 +112,7 @@ private void _startNewLocalityGroup(String name, Set columnFamilie "Cannot start a locality group after starting the default locality group"); writer.startNewLocalityGroup(name, columnFamilies); startedLG = true; + loadPlanCollector.startLocalityGroup(); } /** @@ -175,6 +182,7 @@ public void startNewLocalityGroup(String name, String... families) throws IOExce public void startDefaultLocalityGroup() throws IOException { Preconditions.checkState(!startedDefaultLG); + loadPlanCollector.startLocalityGroup(); writer.startDefaultLocalityGroup(); startedDefaultLG = true; startedLG = true; @@ -204,6 +212,7 @@ public void append(Key key, Value val) throws IOException { validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, cv.length)), Boolean.TRUE); } writer.append(key, val); + loadPlanCollector.append(key); } /** @@ -250,5 +259,27 @@ public void append(Iterable> keyValues) throws IOException { @Override public void close() throws IOException { writer.close(); + loadPlanCollector.close(); + } + + /** + * If no split resolver was provided when the RFileWriter was built then this method will return a + * simple load plan of type {@link org.apache.accumulo.core.data.LoadPlan.RangeType#FILE} using + * the first and last row seen. If a splitResolver was provided then this will return a load plan + * of type {@link org.apache.accumulo.core.data.LoadPlan.RangeType#TABLE} that has the split + * ranges the rows written overlapped. + * + * @param filename This file name will be used in the load plan and it should match the name that + * will be used when bulk importing this file. Only a filename is needed, not a full path. + * @return load plan computed from the keys written to the rfile. + * @see org.apache.accumulo.core.client.rfile.RFile.WriterOptions#withSplitResolver(LoadPlan.SplitResolver) + * @since 3.1.0 + * @throws IllegalStateException is attempting to get load plan before calling {@link #close()} + * @throws IllegalArgumentException is a full path is passed instead of a filename + */ + public LoadPlan getLoadPlan(String filename) { + Preconditions.checkArgument(!filename.contains("/"), + "Unexpected path %s seen instead of file name", filename); + return loadPlanCollector.getLoadPlan(filename); } } diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java index b1d79573380..70428345b5c 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java @@ -37,6 +37,7 @@ import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.crypto.CryptoFactoryLoader; +import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.metadata.UnreferencedTabletFile; import org.apache.accumulo.core.metadata.ValidationUtil; @@ -73,6 +74,7 @@ OutputStream getOutputStream() { private int visCacheSize = 1000; private Map samplerProps = Collections.emptyMap(); private Map summarizerProps = Collections.emptyMap(); + private LoadPlan.SplitResolver splitResolver; private void checkDisjoint(Map props, Map derivedProps, String kind) { @@ -107,6 +109,13 @@ public RFileWriter build() throws IOException { CryptoService cs = CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, tableConfig); + LoadPlanCollector loadPlanCollector; + if (splitResolver != null) { + loadPlanCollector = new LoadPlanCollector(splitResolver); + } else { + loadPlanCollector = new LoadPlanCollector(); + } + if (out.getOutputStream() != null) { FSDataOutputStream fsdo; if (out.getOutputStream() instanceof FSDataOutputStream) { @@ -117,12 +126,13 @@ public RFileWriter build() throws IOException { return new RFileWriter( fileops.newWriterBuilder().forOutputStream(".rf", fsdo, out.getConf(), cs) .withTableConfiguration(acuconf).withStartDisabled().build(), - visCacheSize); + visCacheSize, loadPlanCollector); } else { return new RFileWriter(fileops.newWriterBuilder() .forFile(UnreferencedTabletFile.of(out.getFileSystem(), out.path), out.getFileSystem(), out.getConf(), cs) - .withTableConfiguration(acuconf).withStartDisabled().build(), visCacheSize); + .withTableConfiguration(acuconf).withStartDisabled().build(), visCacheSize, + loadPlanCollector); } } @@ -174,6 +184,12 @@ public WriterOptions withVisibilityCacheSize(int maxSize) { return this; } + @Override + public WriterOptions withSplitResolver(LoadPlan.SplitResolver splitResolver) { + this.splitResolver = splitResolver; + return this; + } + @Override public WriterOptions withSummarizers(SummarizerConfiguration... summarizerConf) { Objects.requireNonNull(summarizerConf); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java index a5e7ff94b5f..899dfba13e2 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java @@ -47,6 +47,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.function.Function; import java.util.stream.Stream; import org.apache.accumulo.core.Constants; @@ -313,19 +314,25 @@ public interface KeyExtentCache { KeyExtent lookup(Text row); } - public static List findOverlappingTablets(KeyExtentCache extentCache, - FileSKVIterator reader) throws IOException { + /** + * Function that will find a row in a file being bulk imported that is >= the row passed to the + * function. If there is no row then it should return null. + */ + public interface NextRowFunction { + Text apply(Text row) throws IOException; + } + + public static List findOverlappingTablets(Function rowToExtentResolver, + NextRowFunction nextRowFunction) throws IOException { List result = new ArrayList<>(); - Collection columnFamilies = Collections.emptyList(); Text row = new Text(); while (true) { - reader.seek(new Range(row, null), columnFamilies, false); - if (!reader.hasTop()) { + row = nextRowFunction.apply(row); + if (row == null) { break; } - row = reader.getTopKey().getRow(); - KeyExtent extent = extentCache.lookup(row); + KeyExtent extent = rowToExtentResolver.apply(row); result.add(extent); row = extent.endRow(); if (row != null) { @@ -345,12 +352,22 @@ private static Text nextRow(Text row) { } public static List findOverlappingTablets(ClientContext context, - KeyExtentCache extentCache, UnreferencedTabletFile file, FileSystem fs, + KeyExtentCache keyExtentCache, UnreferencedTabletFile file, FileSystem fs, Cache fileLenCache, CryptoService cs) throws IOException { try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() .forFile(file, fs, fs.getConf(), cs).withTableConfiguration(context.getConfiguration()) .withFileLenCache(fileLenCache).seekToBeginning().build()) { - return findOverlappingTablets(extentCache, reader); + + Collection columnFamilies = Collections.emptyList(); + NextRowFunction nextRowFunction = row -> { + reader.seek(new Range(row, null), columnFamilies, false); + if (!reader.hasTop()) { + return null; + } + return reader.getTopKey().getRow(); + }; + + return findOverlappingTablets(keyExtentCache::lookup, nextRowFunction); } } @@ -499,7 +516,7 @@ public static List filterInvalid(FileStatus[] files) { continue; } - if (FileOperations.getBulkWorkingFiles().contains(fname)) { + if (FileOperations.isBulkWorkingFile(fname)) { log.debug("{} is an internal working file, ignoring.", fileStatus.getPath()); continue; } diff --git a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java index 311042d778e..1f99d1451a2 100644 --- a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java +++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java @@ -20,17 +20,31 @@ import static java.nio.charset.StandardCharsets.UTF_8; +import java.io.IOException; +import java.net.URI; import java.nio.file.Paths; import java.util.Arrays; +import java.util.Base64; import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.SortedSet; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.admin.TableOperations.ImportMappingOptions; +import org.apache.accumulo.core.client.rfile.RFile; +import org.apache.accumulo.core.clientImpl.bulk.BulkImport; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.primitives.UnsignedBytes; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -228,4 +242,225 @@ public LoadPlan build() { } }; } + + private static final TableId FAKE_ID = TableId.of("999"); + + private static class JsonDestination { + String fileName; + String startRow; + String endRow; + RangeType rangeType; + + JsonDestination() {} + + JsonDestination(Destination destination) { + fileName = destination.getFileName(); + startRow = destination.getStartRow() == null ? null + : Base64.getUrlEncoder().encodeToString(destination.getStartRow()); + endRow = destination.getEndRow() == null ? null + : Base64.getUrlEncoder().encodeToString(destination.getEndRow()); + rangeType = destination.getRangeType(); + } + + Destination toDestination() { + return new Destination(fileName, rangeType, + startRow == null ? null : Base64.getUrlDecoder().decode(startRow), + endRow == null ? null : Base64.getUrlDecoder().decode(endRow)); + } + } + + private static final class JsonAll { + List destinations; + + JsonAll() {} + + JsonAll(List destinations) { + this.destinations = + destinations.stream().map(JsonDestination::new).collect(Collectors.toList()); + } + + } + + private static final Gson gson = new GsonBuilder().disableJdkUnsafe().serializeNulls().create(); + + /** + * Serializes the load plan to json that looks like the following. The values of startRow and + * endRow field are base64 encoded using {@link Base64#getUrlEncoder()}. + * + *
+   * {
+   *   "destinations": [
+   *     {
+   *       "fileName": "f1.rf",
+   *       "startRow": null,
+   *       "endRow": "MDAz",
+   *       "rangeType": "TABLE"
+   *     },
+   *     {
+   *       "fileName": "f2.rf",
+   *       "startRow": "MDA0",
+   *       "endRow": "MDA3",
+   *       "rangeType": "FILE"
+   *     },
+   *     {
+   *       "fileName": "f1.rf",
+   *       "startRow": "MDA1",
+   *       "endRow": "MDA2",
+   *       "rangeType": "TABLE"
+   *     },
+   *     {
+   *       "fileName": "f3.rf",
+   *       "startRow": "MDA4",
+   *       "endRow": null,
+   *       "rangeType": "TABLE"
+   *     }
+   *   ]
+   * }
+   * 
+ * + * @since 3.1.0 + */ + public String toJson() { + return gson.toJson(new JsonAll(destinations)); + } + + /** + * Deserializes json to a load plan. + * + * @param json produced by {@link #toJson()} + */ + public static LoadPlan fromJson(String json) { + var dests = gson.fromJson(json, JsonAll.class).destinations.stream() + .map(JsonDestination::toDestination).collect(Collectors.toUnmodifiableList()); + return new LoadPlan(dests); + } + + /** + * Represents two split points that exist in a table being bulk imported to. + * + * @since 3.1.0 + */ + public static class TableSplits { + private final Text prevRow; + private final Text endRow; + + public TableSplits(Text prevRow, Text endRow) { + Preconditions.checkArgument( + prevRow == null || endRow == null || prevRow.compareTo(endRow) < 0, "%s >= %s", prevRow, + endRow); + this.prevRow = prevRow; + this.endRow = endRow; + } + + public Text getPrevRow() { + return prevRow; + } + + public Text getEndRow() { + return endRow; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TableSplits that = (TableSplits) o; + return Objects.equals(prevRow, that.prevRow) && Objects.equals(endRow, that.endRow); + } + + @Override + public int hashCode() { + return Objects.hash(prevRow, endRow); + } + + @Override + public String toString() { + return "(" + prevRow + "," + endRow + "]"; + } + } + + /** + * A function that maps a row to two table split points that contain the row. These splits must + * exist in the table being bulk imported to. There is no requirement that the splits are + * contiguous. For example if a table has splits C,D,E,M and we ask for splits containing row H + * its ok to return D,M, but that could result in the file mapping to more actual tablets than + * needed. + * + * @since 3.1.0 + */ + public interface SplitResolver extends Function { + static SplitResolver from(SortedSet splits) { + return row -> { + var headSet = splits.headSet(row); + Text prevRow = headSet.isEmpty() ? null : headSet.last(); + var tailSet = splits.tailSet(row); + Text endRow = tailSet.isEmpty() ? null : tailSet.first(); + return new TableSplits(prevRow, endRow); + }; + } + + /** + * For a given row R this function should find two split points S1 and S2 that exist in the + * table being bulk imported to such that S1 < R <= S2. The closer S1 and S2 are to each + * other the better. + */ + @Override + TableSplits apply(Text row); + } + + /** + * Computes a load plan for a given rfile. This will open the rfile and find every + * {@link TableSplits} that overlaps rows in the file and add those to the returned load plan. + * + * @since 3.1.0 + */ + public static LoadPlan compute(URI file, SplitResolver splitResolver) throws IOException { + return compute(file, Map.of(), splitResolver); + } + + /** + * Computes a load plan for a given rfile. This will open the rfile and find every + * {@link TableSplits} that overlaps rows in the file and add those to the returned load plan. + * + * @param properties used when opening the rfile, see + * {@link org.apache.accumulo.core.client.rfile.RFile.ScannerOptions#withTableProperties(Map)} + * @since 3.1.0 + */ + public static LoadPlan compute(URI file, Map properties, + SplitResolver splitResolver) throws IOException { + try (var scanner = RFile.newScanner().from(file.toString()).withoutSystemIterators() + .withTableProperties(properties).withIndexCache(10_000_000).build()) { + BulkImport.NextRowFunction nextRowFunction = row -> { + scanner.setRange(new Range(row, null)); + var iter = scanner.iterator(); + if (iter.hasNext()) { + return iter.next().getKey().getRow(); + } else { + return null; + } + }; + + Function rowToExtentResolver = row -> { + var tabletRange = splitResolver.apply(row); + var extent = new KeyExtent(FAKE_ID, tabletRange.endRow, tabletRange.prevRow); + Preconditions.checkState(extent.contains(row), "%s does not contain %s", tabletRange, row); + return extent; + }; + + List overlapping = + BulkImport.findOverlappingTablets(rowToExtentResolver, nextRowFunction); + + Path path = new Path(file); + + var builder = builder(); + for (var extent : overlapping) { + builder.loadFileTo(path.getName(), RangeType.TABLE, extent.prevEndRow(), extent.endRow()); + } + return builder.build(); + } + } } diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java index 5182c614aa8..6a7daefed89 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java @@ -55,6 +55,13 @@ public abstract class FileOperations { Set.of(Constants.BULK_LOAD_MAPPING, Constants.BULK_RENAME_FILE, FileOutputCommitter.SUCCEEDED_FILE_NAME, HADOOP_JOBHISTORY_LOCATION); + public static boolean isBulkWorkingFile(String fileName) { + if (fileName.startsWith(Constants.BULK_WORKING_PREFIX)) { + return true; + } + return bulkWorkingFiles.contains(fileName); + } + public static Set getValidExtensions() { return validExtensions; } diff --git a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java index 3466026518c..393511e8a26 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java @@ -28,6 +28,7 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; @@ -39,7 +40,11 @@ import java.util.Map; import java.util.Map.Entry; import java.util.SortedMap; +import java.util.SortedSet; import java.util.TreeMap; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; @@ -57,6 +62,8 @@ import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; +import org.apache.accumulo.core.data.LoadPlanTest; import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; @@ -938,4 +945,174 @@ public void testMultipleFilesAndCache() throws Exception { assertEquals(testData, toMap(scanner)); scanner.close(); } + + @Test + public void testLoadPlanEmpty() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + LoadPlan.SplitResolver splitResolver = + LoadPlan.SplitResolver.from(new TreeSet<>(List.of(new Text("m")))); + + for (boolean withSplits : List.of(true, false)) { + String testFile = createTmpTestFile(); + var builder = RFile.newWriter().to(testFile).withFileSystem(localFs); + if (withSplits) { + builder = builder.withSplitResolver(splitResolver); + } + var writer = builder.build(); + + // can not get load plan before closing file + assertThrows(IllegalStateException.class, + () -> writer.getLoadPlan(new Path(testFile).getName())); + + try (writer) { + writer.startDefaultLocalityGroup(); + assertThrows(IllegalStateException.class, + () -> writer.getLoadPlan(new Path(testFile).getName())); + } + var loadPlan = writer.getLoadPlan(new Path(testFile).getName()); + assertEquals(0, loadPlan.getDestinations().size()); + + loadPlan = LoadPlan.compute(new URI(testFile), splitResolver); + assertEquals(0, loadPlan.getDestinations().size()); + } + } + + @Test + public void testLoadPlanLocalityGroupsNoSplits() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build(); + try (writer) { + writer.startNewLocalityGroup("LG1", "F1"); + writer.append(new Key("001", "F1"), "V1"); + writer.append(new Key("005", "F1"), "V2"); + writer.startNewLocalityGroup("LG2", "F3"); + writer.append(new Key("003", "F3"), "V3"); + writer.append(new Key("004", "F3"), "V4"); + writer.startDefaultLocalityGroup(); + writer.append(new Key("007", "F4"), "V5"); + writer.append(new Key("009", "F4"), "V6"); + } + + var filename = new Path(testFile).getName(); + var loadPlan = writer.getLoadPlan(filename); + assertEquals(1, loadPlan.getDestinations().size()); + + // The minimum and maximum rows happend in different locality groups, the load plan should + // reflect this + var expectedLoadPlan = + LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, "001", "009").build(); + assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson()); + } + + @Test + public void testLoadPlanLocalityGroupsSplits() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + SortedSet splits = + Stream.of("001", "002", "003", "004", "005", "006", "007", "008", "009").map(Text::new) + .collect(Collectors.toCollection(TreeSet::new)); + var splitResolver = LoadPlan.SplitResolver.from(splits); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs) + .withSplitResolver(splitResolver).build(); + try (writer) { + writer.startNewLocalityGroup("LG1", "F1"); + writer.append(new Key("001", "F1"), "V1"); + writer.append(new Key("005", "F1"), "V2"); + writer.startNewLocalityGroup("LG2", "F3"); + writer.append(new Key("003", "F3"), "V3"); + writer.append(new Key("005", "F3"), "V3"); + writer.append(new Key("007", "F3"), "V4"); + writer.startDefaultLocalityGroup(); + writer.append(new Key("007", "F4"), "V5"); + writer.append(new Key("009", "F4"), "V6"); + } + + var filename = new Path(testFile).getName(); + var loadPlan = writer.getLoadPlan(filename); + assertEquals(5, loadPlan.getDestinations().size()); + + var builder = LoadPlan.builder(); + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, null, "001"); + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, "004", "005"); + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, "002", "003"); + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, "006", "007"); + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, "008", "009"); + assertEquals(LoadPlanTest.toString(builder.build().getDestinations()), + LoadPlanTest.toString(loadPlan.getDestinations())); + + loadPlan = LoadPlan.compute(new URI(testFile), splitResolver); + assertEquals(LoadPlanTest.toString(builder.build().getDestinations()), + LoadPlanTest.toString(loadPlan.getDestinations())); + } + + @Test + public void testIncorrectSplitResolver() throws Exception { + // for some rows the returns table splits will not contain the row. This should cause an error. + LoadPlan.SplitResolver splitResolver = + row -> new LoadPlan.TableSplits(new Text("003"), new Text("005")); + + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs) + .withSplitResolver(splitResolver).build(); + try (writer) { + writer.startDefaultLocalityGroup(); + writer.append(new Key("004", "F4"), "V2"); + var e = assertThrows(IllegalStateException.class, + () -> writer.append(new Key("007", "F4"), "V2")); + assertTrue(e.getMessage().contains("(003,005]")); + assertTrue(e.getMessage().contains("007")); + } + + var testFile2 = createTmpTestFile(); + var writer2 = RFile.newWriter().to(testFile2).withFileSystem(localFs).build(); + try (writer2) { + writer2.startDefaultLocalityGroup(); + writer2.append(new Key("004", "F4"), "V2"); + writer2.append(new Key("007", "F4"), "V2"); + } + + var e = assertThrows(IllegalStateException.class, + () -> LoadPlan.compute(new URI(testFile), splitResolver)); + assertTrue(e.getMessage().contains("(003,005]")); + assertTrue(e.getMessage().contains("007")); + } + + @Test + public void testGetLoadPlanBeforeClose() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build(); + try (writer) { + var e = assertThrows(IllegalStateException.class, + () -> writer.getLoadPlan(new Path(testFile).getName())); + assertEquals("Attempted to get load plan before closing", e.getMessage()); + writer.startDefaultLocalityGroup(); + writer.append(new Key("004", "F4"), "V2"); + var e2 = assertThrows(IllegalStateException.class, + () -> writer.getLoadPlan(new Path(testFile).getName())); + assertEquals("Attempted to get load plan before closing", e2.getMessage()); + } + } + + @Test + public void testGetLoadPlanWithPath() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build(); + writer.close(); + + var e = + assertThrows(IllegalArgumentException.class, () -> writer.getLoadPlan(testFile.toString())); + assertTrue(e.getMessage().contains("Unexpected path")); + assertEquals(0, writer.getLoadPlan(new Path(testFile).getName()).getDestinations().size()); + } } diff --git a/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java b/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java index bba30a555d3..18f2038163d 100644 --- a/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java +++ b/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java @@ -23,8 +23,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import java.util.Base64; +import java.util.Collection; import java.util.HashSet; import java.util.Set; +import java.util.stream.Collectors; import org.apache.accumulo.core.data.LoadPlan.Destination; import org.apache.accumulo.core.data.LoadPlan.RangeType; @@ -100,6 +103,47 @@ public void testTypes() { assertEquals(expected, actual); + var loadPlan2 = LoadPlan.fromJson(loadPlan.toJson()); + Set actual2 = + loadPlan2.getDestinations().stream().map(LoadPlanTest::toString).collect(toSet()); + assertEquals(expected, actual2); + } + + @Test + public void testJson() { + var loadPlan = LoadPlan.builder().build(); + assertEquals(0, loadPlan.getDestinations().size()); + assertEquals("{\"destinations\":[]}", loadPlan.toJson()); + + var builder = LoadPlan.builder(); + builder.loadFileTo("f1.rf", RangeType.TABLE, null, "003"); + builder.loadFileTo("f2.rf", RangeType.FILE, "004", "007"); + builder.loadFileTo("f1.rf", RangeType.TABLE, "005", "006"); + builder.loadFileTo("f3.rf", RangeType.TABLE, "008", null); + String json = builder.build().toJson(); + + String b64003 = Base64.getUrlEncoder().encodeToString("003".getBytes(UTF_8)); + String b64004 = Base64.getUrlEncoder().encodeToString("004".getBytes(UTF_8)); + String b64005 = Base64.getUrlEncoder().encodeToString("005".getBytes(UTF_8)); + String b64006 = Base64.getUrlEncoder().encodeToString("006".getBytes(UTF_8)); + String b64007 = Base64.getUrlEncoder().encodeToString("007".getBytes(UTF_8)); + String b64008 = Base64.getUrlEncoder().encodeToString("008".getBytes(UTF_8)); + + String expected = "{'destinations':[{'fileName':'f1.rf','startRow':null,'endRow':'" + b64003 + + "','rangeType':'TABLE'},{'fileName':'f2.rf','startRow':'" + b64004 + "','endRow':'" + + b64007 + "','rangeType':'FILE'},{'fileName':'f1.rf','startRow':'" + b64005 + + "','endRow':'" + b64006 + "','rangeType':'TABLE'},{'fileName':'f3.rf','startRow':'" + + b64008 + "','endRow':null,'rangeType':'TABLE'}]}"; + + assertEquals(expected.replace("'", "\""), json); + } + + @Test + public void testTableSplits() { + assertThrows(IllegalArgumentException.class, + () -> new LoadPlan.TableSplits(new Text("004"), new Text("004"))); + assertThrows(IllegalArgumentException.class, + () -> new LoadPlan.TableSplits(new Text("004"), new Text("003"))); } private static String toString(Destination d) { @@ -110,4 +154,8 @@ private static String toString(Destination d) { private static String toString(byte[] r) { return r == null ? null : new String(r, UTF_8); } + + public static Set toString(Collection destinations) { + return destinations.stream().map(d -> toString(d)).collect(Collectors.toSet()); + } } diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java index 81c76d58ff6..38b2b954342 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java @@ -207,7 +207,6 @@ static String formatString(String prefix, int i) { public void test1() throws IOException { // test an empty file - TestRFile trf = new TestRFile(conf); trf.openWriter(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java index ee9d418b68d..ee90d4841f0 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java @@ -30,6 +30,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.math.BigInteger; +import java.net.URI; import java.nio.file.Files; import java.nio.file.Paths; import java.security.MessageDigest; @@ -88,6 +89,7 @@ import org.apache.accumulo.server.constraints.MetadataConstraints; import org.apache.accumulo.server.constraints.SystemEnvironment; import org.apache.accumulo.test.util.Wait; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -471,6 +473,63 @@ public void testBadLoadPlans() throws Exception { } } + @Test + public void testComputeLoadPlan() throws Exception { + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + addSplits(c, tableName, "0333 0666 0999 1333 1666"); + + String dir = getDir("/testBulkFile-"); + + Map> hashes = new HashMap<>(); + String h1 = writeData(dir + "/f1.", aconf, 0, 333); + hashes.put("0333", new HashSet<>(List.of(h1))); + String h2 = writeData(dir + "/f2.", aconf, 0, 666); + hashes.get("0333").add(h2); + hashes.put("0666", new HashSet<>(List.of(h2))); + String h3 = writeData(dir + "/f3.", aconf, 334, 700); + hashes.get("0666").add(h3); + hashes.put("0999", new HashSet<>(List.of(h3))); + hashes.put("1333", Set.of()); + hashes.put("1666", Set.of()); + hashes.put("null", Set.of()); + + SortedSet splits = new TreeSet<>(c.tableOperations().listSplits(tableName)); + + for (String filename : List.of("f1.rf", "f2.rf", "f3.rf")) { + // The body of this loop simulates what each reducer would do + Path path = new Path(dir + "/" + filename); + + // compute the load plan for the rfile + URI file = path.toUri(); + String lpJson = LoadPlan.compute(file, LoadPlan.SplitResolver.from(splits)).toJson(); + + // save the load plan to a file + Path lpPath = new Path(path.getParent(), path.getName().replace(".rf", ".lp")); + try (var output = getCluster().getFileSystem().create(lpPath, false)) { + IOUtils.write(lpJson, output, UTF_8); + } + } + + // This simulates the code that would run after the map reduce job and bulk import the files + var builder = LoadPlan.builder(); + for (var status : getCluster().getFileSystem().listStatus(new Path(dir), + p -> p.getName().endsWith(".lp"))) { + try (var input = getCluster().getFileSystem().open(status.getPath())) { + String lpJson = IOUtils.toString(input, UTF_8); + builder.addPlan(LoadPlan.fromJson(lpJson)); + } + } + + LoadPlan lpAll = builder.build(); + + c.tableOperations().importDirectory(dir).to(tableName).plan(lpAll).load(); + + verifyData(c, tableName, 0, 700, false); + verifyMetadata(c, tableName, hashes); + } + } + @Test public void testEmptyDir() throws Exception { try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { @@ -616,7 +675,7 @@ private void verifyMetadata(AccumuloClient client, String tableName, String endRow = tablet.getEndRow() == null ? "null" : tablet.getEndRow().toString(); - assertEquals(expectedHashes.get(endRow), fileHashes); + assertEquals(expectedHashes.get(endRow), fileHashes, "endRow " + endRow); endRowsSeen.add(endRow); }