From 02daa79cdc62641d5dfeebf7a0965b87b1efd4d3 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 30 Sep 2024 17:13:01 +0000 Subject: [PATCH 1/6] Offers new ways to compute bulk import load plans. Two new ways of computing bulk import load plans are offered in these change. First the RFile API was modified to support computing a LoadPlan as the RFile is written. Second a new LoadPlan.compute() method was added that creates a LoadPlan from an existing RFile. In addition to these changes methods were added to LoadPlan that support serializing and deserializing load plans to/from json. All of these changes together support the use case of computing load plans in a distributed manner. For example, with a bulk import directory with N files the following use case is now supported. 1. For eack file a task is spun up on a remote server that calls the new LoadPlan.compute() API to determine what tablets the file overlaps. Then the new LoadPlan.toJson() method is called to serialize the load plan and send it to a central place. 2. All the load plans from the remote servers are deserialized calling the new LoadPlan.fromJson() method and merged into a single load plan that is used to do the bulk import. Another use case these new APIs could support is running this new code in the map reduce job that generates bulk import data. 1. In each reducer as it writes to an rfile it could also be building a LoadPlan. A load plan can be obtained from the Rfile after closing it and serialized using LoadPlan.toJson() and the result saved to a file. So after the map reduce job completes each rfile would have corresponding file with a load plan for that file. 2. Another process that runs after the map reduce job can load all the load plans from files and merge them using the new LoadPlan.fromJson() method. Then the merged LoadPlan can be used to do the bulk import. BulkNewIT.testComputeLoadPlan() simulates this map reduce use case by going through the steps in code that a map reduce job would. This tests the new APIs and shows what using it would look like. Both of these use cases avoid doing the analysis of files on a single machine doing the bulk import. Bulk import V1 had this functionality and would ask random tservers to do the file analysis. This could cause unexpected load on those tservers. Bulk V1 would interleave analyzing files and adding them to tablets. This could lead to odd situations where files are partially imported to some tablets and analysis fails, leaving the file partially imported. Bulk v2 does all analysis before any files are added to tablets, however it lacks this distributed analysis capability. These changes provide the building blocks to do the distributed analysis that bulk v1 did for bulk v2. --- .../core/client/rfile/LoadPlanCollector.java | 130 ++++++++++ .../accumulo/core/client/rfile/RFile.java | 10 + .../core/client/rfile/RFileWriter.java | 28 ++- .../core/client/rfile/RFileWriterBuilder.java | 20 +- .../core/clientImpl/bulk/BulkImport.java | 35 ++- .../apache/accumulo/core/data/LoadPlan.java | 225 ++++++++++++++++++ .../core/client/rfile/RFileClientTest.java | 154 ++++++++++++ .../accumulo/core/data/LoadPlanTest.java | 48 ++++ .../accumulo/core/file/rfile/RFileTest.java | 1 - .../accumulo/test/functional/BulkNewIT.java | 61 ++++- 10 files changed, 698 insertions(+), 14 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java new file mode 100644 index 00000000000..1ee4c8b7133 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.client.rfile; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.hadoop.io.Text; + +import com.google.common.base.Preconditions; + +class LoadPlanCollector { + + private final LoadPlan.SplitResolver splitResolver; + private boolean finished = false; + private Text lgFirstRow; + private Text lgLastRow; + private Text firstRow; + private Text lastRow; + private Set overlappingExtents; + private KeyExtent currentExtent; + private long appended = 0; + + LoadPlanCollector(LoadPlan.SplitResolver splitResolver) { + this.splitResolver = splitResolver; + this.overlappingExtents = new HashSet<>(); + } + + LoadPlanCollector() { + splitResolver = null; + this.overlappingExtents = null; + + } + + private void appendNoSplits(Key key) { + if (lgFirstRow == null) { + lgFirstRow = key.getRow(); + lgLastRow = lgFirstRow; + } else { + var row = key.getRow(); + lgLastRow = row; + } + } + + private static final TableId FAKE_ID = TableId.of("123"); + + private void appendSplits(Key key) { + var row = key.getRow(); + if (currentExtent == null || !currentExtent.contains(row)) { + var tableSplits = splitResolver.apply(row); + var extent = new KeyExtent(FAKE_ID, tableSplits.getEndRow(), tableSplits.getPrevRow()); + Preconditions.checkState(extent.contains(row), "%s does not contain %s", tableSplits, row); + if (currentExtent != null) { + overlappingExtents.add(currentExtent); + } + currentExtent = extent; + } + } + + public void append(Key key) { + if (splitResolver == null) { + appendNoSplits(key); + } else { + appendSplits(key); + } + appended++; + } + + public void startLocalityGroup() { + if (lgFirstRow != null) { + if (firstRow == null) { + firstRow = lgFirstRow; + lastRow = lgLastRow; + } else { + // take the minimum + firstRow = firstRow.compareTo(lgFirstRow) < 0 ? firstRow : lgFirstRow; + // take the maximum + lastRow = lastRow.compareTo(lgLastRow) > 0 ? lastRow : lgLastRow; + } + lgFirstRow = null; + lgLastRow = null; + } + } + + public LoadPlan getLoadPlan(String filename) { + if (appended == 0) { + return LoadPlan.builder().build(); + } + + Preconditions.checkState(finished, "Attempted to get load plan before closing"); + if (splitResolver == null) { + return LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, firstRow, lastRow) + .build(); + } else { + var builder = LoadPlan.builder(); + overlappingExtents.add(currentExtent); + for (var extent : overlappingExtents) { + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, extent.prevEndRow(), + extent.endRow()); + } + return builder.build(); + } + } + + public void close() { + finished = true; + // compute the overall min and max rows + startLocalityGroup(); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java index 64956dcbc20..8f96643143b 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java @@ -35,6 +35,7 @@ import org.apache.accumulo.core.client.summary.Summary.FileStatistics; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.RowRangeUtil; @@ -463,6 +464,15 @@ default WriterOptions withSummarizers(SummarizerConfiguration... summarizerConf) */ WriterOptions withVisibilityCacheSize(int maxSize); + /** + * @param splitResolver builds a {@link LoadPlan} using table split points provided by the given + * splitResolver. + * @return this + * @see RFileWriter#getLoadPlan(String) + * @since 3.1.0 + */ + WriterOptions withSplitResolver(LoadPlan.SplitResolver splitResolver); + /** * @return a new RfileWriter created with the options previously specified. */ diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java index b4d6def4a23..819dbb34d8d 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java @@ -29,6 +29,7 @@ import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.security.ColumnVisibility; @@ -92,12 +93,17 @@ public class RFileWriter implements AutoCloseable { private final FileSKVWriter writer; private final LRUMap validVisibilities; + + // TODO should be able to completely remove this as lower level code is already doing some things + // like tracking first and last keys per LG. Added to get simple initial impl before optimizing. + private final LoadPlanCollector loadPlanCollector; private boolean startedLG; private boolean startedDefaultLG; - RFileWriter(FileSKVWriter fileSKVWriter, int visCacheSize) { + RFileWriter(FileSKVWriter fileSKVWriter, int visCacheSize, LoadPlanCollector loadPlanCollector) { this.writer = fileSKVWriter; this.validVisibilities = new LRUMap<>(visCacheSize); + this.loadPlanCollector = loadPlanCollector; } private void _startNewLocalityGroup(String name, Set columnFamilies) @@ -106,6 +112,7 @@ private void _startNewLocalityGroup(String name, Set columnFamilie "Cannot start a locality group after starting the default locality group"); writer.startNewLocalityGroup(name, columnFamilies); startedLG = true; + loadPlanCollector.startLocalityGroup(); } /** @@ -175,6 +182,7 @@ public void startNewLocalityGroup(String name, String... families) throws IOExce public void startDefaultLocalityGroup() throws IOException { Preconditions.checkState(!startedDefaultLG); + loadPlanCollector.startLocalityGroup(); writer.startDefaultLocalityGroup(); startedDefaultLG = true; startedLG = true; @@ -204,6 +212,7 @@ public void append(Key key, Value val) throws IOException { validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, cv.length)), Boolean.TRUE); } writer.append(key, val); + loadPlanCollector.append(key); } /** @@ -250,5 +259,22 @@ public void append(Iterable> keyValues) throws IOException { @Override public void close() throws IOException { writer.close(); + loadPlanCollector.close(); + } + + /** + * If no split resolver was provided when the RFileWriter was built then this method will return a + * simple load plan of type {@link org.apache.accumulo.core.data.LoadPlan.RangeType#FILE} using + * the first and last row seen. If a splitResolver was provided then this will return a load plan + * of type {@link org.apache.accumulo.core.data.LoadPlan.RangeType#TABLE} that has the split + * ranges the rows written overlapped. + * + * @param filename + * @return load plan computed from the keys written to the rfile. + * @see org.apache.accumulo.core.client.rfile.RFile.WriterOptions#withSplitResolver(LoadPlan.SplitResolver) + * @since 3.1.0 + */ + public LoadPlan getLoadPlan(String filename) { + return loadPlanCollector.getLoadPlan(filename); } } diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java index b1d79573380..70428345b5c 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java @@ -37,6 +37,7 @@ import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.crypto.CryptoFactoryLoader; +import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.metadata.UnreferencedTabletFile; import org.apache.accumulo.core.metadata.ValidationUtil; @@ -73,6 +74,7 @@ OutputStream getOutputStream() { private int visCacheSize = 1000; private Map samplerProps = Collections.emptyMap(); private Map summarizerProps = Collections.emptyMap(); + private LoadPlan.SplitResolver splitResolver; private void checkDisjoint(Map props, Map derivedProps, String kind) { @@ -107,6 +109,13 @@ public RFileWriter build() throws IOException { CryptoService cs = CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, tableConfig); + LoadPlanCollector loadPlanCollector; + if (splitResolver != null) { + loadPlanCollector = new LoadPlanCollector(splitResolver); + } else { + loadPlanCollector = new LoadPlanCollector(); + } + if (out.getOutputStream() != null) { FSDataOutputStream fsdo; if (out.getOutputStream() instanceof FSDataOutputStream) { @@ -117,12 +126,13 @@ public RFileWriter build() throws IOException { return new RFileWriter( fileops.newWriterBuilder().forOutputStream(".rf", fsdo, out.getConf(), cs) .withTableConfiguration(acuconf).withStartDisabled().build(), - visCacheSize); + visCacheSize, loadPlanCollector); } else { return new RFileWriter(fileops.newWriterBuilder() .forFile(UnreferencedTabletFile.of(out.getFileSystem(), out.path), out.getFileSystem(), out.getConf(), cs) - .withTableConfiguration(acuconf).withStartDisabled().build(), visCacheSize); + .withTableConfiguration(acuconf).withStartDisabled().build(), visCacheSize, + loadPlanCollector); } } @@ -174,6 +184,12 @@ public WriterOptions withVisibilityCacheSize(int maxSize) { return this; } + @Override + public WriterOptions withSplitResolver(LoadPlan.SplitResolver splitResolver) { + this.splitResolver = splitResolver; + return this; + } + @Override public WriterOptions withSummarizers(SummarizerConfiguration... summarizerConf) { Objects.requireNonNull(summarizerConf); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java index a5e7ff94b5f..d85c19ac548 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java @@ -47,6 +47,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.function.Function; import java.util.stream.Stream; import org.apache.accumulo.core.Constants; @@ -313,19 +314,25 @@ public interface KeyExtentCache { KeyExtent lookup(Text row); } - public static List findOverlappingTablets(KeyExtentCache extentCache, - FileSKVIterator reader) throws IOException { + /** + * Function that will find a row in a file being bulk imported that is >= the row passed to the + * function. If there is no row then it should return null. + */ + public interface NextRowFunction { + Text apply(Text row) throws IOException; + } + + public static List findOverlappingTablets(Function rowToExtentResolver, + NextRowFunction nextRowFunction) throws IOException { List result = new ArrayList<>(); - Collection columnFamilies = Collections.emptyList(); Text row = new Text(); while (true) { - reader.seek(new Range(row, null), columnFamilies, false); - if (!reader.hasTop()) { + row = nextRowFunction.apply(row); + if (row == null) { break; } - row = reader.getTopKey().getRow(); - KeyExtent extent = extentCache.lookup(row); + KeyExtent extent = rowToExtentResolver.apply(row); result.add(extent); row = extent.endRow(); if (row != null) { @@ -345,12 +352,22 @@ private static Text nextRow(Text row) { } public static List findOverlappingTablets(ClientContext context, - KeyExtentCache extentCache, UnreferencedTabletFile file, FileSystem fs, + KeyExtentCache keyExtentCache, UnreferencedTabletFile file, FileSystem fs, Cache fileLenCache, CryptoService cs) throws IOException { try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() .forFile(file, fs, fs.getConf(), cs).withTableConfiguration(context.getConfiguration()) .withFileLenCache(fileLenCache).seekToBeginning().build()) { - return findOverlappingTablets(extentCache, reader); + + Collection columnFamilies = Collections.emptyList(); + NextRowFunction nextRowFunction = row -> { + reader.seek(new Range(row, null), columnFamilies, false); + if (!reader.hasTop()) { + return null; + } + return reader.getTopKey().getRow(); + }; + + return findOverlappingTablets(keyExtentCache::lookup, nextRowFunction); } } diff --git a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java index 311042d778e..6931dd4251f 100644 --- a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java +++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java @@ -20,17 +20,31 @@ import static java.nio.charset.StandardCharsets.UTF_8; +import java.io.IOException; +import java.net.URI; import java.nio.file.Paths; import java.util.Arrays; +import java.util.Base64; import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.SortedSet; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.admin.TableOperations.ImportMappingOptions; +import org.apache.accumulo.core.client.rfile.RFile; +import org.apache.accumulo.core.clientImpl.bulk.BulkImport; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.primitives.UnsignedBytes; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -228,4 +242,215 @@ public LoadPlan build() { } }; } + + private static final TableId FAKE_ID = TableId.of("999"); + + private static class JsonDestination { + String fileName; + String startRow; + String endRow; + RangeType rangeType; + + JsonDestination() {} + + JsonDestination(Destination destination) { + fileName = destination.getFileName(); + startRow = destination.getStartRow() == null ? null + : Base64.getUrlEncoder().encodeToString(destination.getStartRow()); + endRow = destination.getEndRow() == null ? null + : Base64.getUrlEncoder().encodeToString(destination.getEndRow()); + rangeType = destination.getRangeType(); + } + + Destination toDestination() { + return new Destination(fileName, rangeType, + startRow == null ? null : Base64.getUrlDecoder().decode(startRow), + endRow == null ? null : Base64.getUrlDecoder().decode(endRow)); + } + } + + private static final class JsonAll { + List destinations; + + JsonAll() {} + + JsonAll(List destinations) { + this.destinations = + destinations.stream().map(JsonDestination::new).collect(Collectors.toList()); + } + + } + + private static final Gson gson = new GsonBuilder().disableJdkUnsafe().serializeNulls().create(); + + /** + * Serializes the load plan to json that looks like the following. The values of startRow and + * endRow field are base64 encoded using {@link Base64#getUrlEncoder()}. + * + *
+   * {
+   *   "destinations": [
+   *     {
+   *       "fileName": "f1.rf",
+   *       "startRow": null,
+   *       "endRow": "MDAz",
+   *       "rangeType": "TABLE"
+   *     },
+   *     {
+   *       "fileName": "f2.rf",
+   *       "startRow": "MDA0",
+   *       "endRow": "MDA3",
+   *       "rangeType": "FILE"
+   *     },
+   *     {
+   *       "fileName": "f1.rf",
+   *       "startRow": "MDA1",
+   *       "endRow": "MDA2",
+   *       "rangeType": "TABLE"
+   *     },
+   *     {
+   *       "fileName": "f3.rf",
+   *       "startRow": "MDA4",
+   *       "endRow": null,
+   *       "rangeType": "TABLE"
+   *     }
+   *   ]
+   * }
+   * 
+ * + * @since 3.1.0 + */ + public String toJson() { + return gson.toJson(new JsonAll(destinations)); + } + + /** + * Deserializes json to a load plan. + * + * @param json produced by {@link #toJson()} + */ + public static LoadPlan fromJson(String json) { + var dests = gson.fromJson(json, JsonAll.class).destinations.stream() + .map(JsonDestination::toDestination).collect(Collectors.toUnmodifiableList()); + return new LoadPlan(dests); + } + + /** + * Represents two split points that exist in a table being bulk imported to. + * + * @since 3.1.0 + */ + public static class TableSplits { + private final Text prevRow; + private final Text endRow; + + public TableSplits(Text prevRow, Text endRow) { + Preconditions.checkArgument( + prevRow == null || endRow == null || prevRow.compareTo(endRow) < 0, "%s >= %s", prevRow, + endRow); + this.prevRow = prevRow; + this.endRow = endRow; + } + + public Text getPrevRow() { + return prevRow; + } + + public Text getEndRow() { + return endRow; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + TableSplits that = (TableSplits) o; + return Objects.equals(prevRow, that.prevRow) && Objects.equals(endRow, that.endRow); + } + + @Override + public int hashCode() { + return Objects.hash(prevRow, endRow); + } + + @Override + public String toString() { + return "(" + prevRow + "," + endRow + "]"; + } + } + + /** + * A function that maps a row to two table split points that contain the row. These splits must + * exist in the table being bulk imported to. There is no requirement that the splits are + * contiguous. For example if a table has splits C,D,E,M and we ask for splits containing row H + * its ok to return D,M, but that could result in the file mapping to more actual tablets than + * needed. + * + * @since 3.1.0 + */ + public interface SplitResolver extends Function { + static SplitResolver from(SortedSet splits) { + return row -> { + var headSet = splits.headSet(row); + Text prevRow = headSet.isEmpty() ? null : headSet.last(); + var tailSet = splits.tailSet(row); + Text endRow = tailSet.isEmpty() ? null : tailSet.first(); + return new TableSplits(prevRow, endRow); + }; + } + + /** + * For a given row R this function should find two split points S1 and S2 that exist in the + * table being bulk imported to such that S1 < R <= S2. The closer S1 and S2 are to each other + * the better. + */ + @Override + TableSplits apply(Text row); + } + + public static LoadPlan compute(URI file, SplitResolver splitResolver) throws IOException { + return compute(file, Map.of(), splitResolver); + } + + /** + * Computes a load plan for a given rfile. This will open the rfile and find every + * {@link TableSplits} that overlaps rows in the file and add those to the returned load plan. + * + * @since 3.1.0 + */ + public static LoadPlan compute(URI file, Map properties, + SplitResolver splitResolver) throws IOException { + try (var scanner = RFile.newScanner().from(file.toString()).withoutSystemIterators() + .withTableProperties(properties).withIndexCache(10_000_000).build()) { + BulkImport.NextRowFunction nextRowFunction = row -> { + scanner.setRange(new Range(row, null)); + var iter = scanner.iterator(); + if (iter.hasNext()) { + return iter.next().getKey().getRow(); + } else { + return null; + } + }; + + Function rowToExtentResolver = row -> { + var tabletRange = splitResolver.apply(row); + var extent = new KeyExtent(FAKE_ID, tabletRange.endRow, tabletRange.prevRow); + Preconditions.checkState(extent.contains(row), "%s does not contain %s", tabletRange, row); + return extent; + }; + + List overlapping = + BulkImport.findOverlappingTablets(rowToExtentResolver, nextRowFunction); + + Path path = new Path(file); + + var builder = builder(); + for (var extent : overlapping) { + builder.loadFileTo(path.getName(), RangeType.TABLE, extent.prevEndRow(), extent.endRow()); + } + return builder.build(); + } + } } diff --git a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java index 3466026518c..276126c589e 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java @@ -28,6 +28,7 @@ import java.io.File; import java.io.IOException; +import java.net.URI; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; @@ -39,7 +40,11 @@ import java.util.Map; import java.util.Map.Entry; import java.util.SortedMap; +import java.util.SortedSet; import java.util.TreeMap; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; @@ -57,6 +62,8 @@ import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; +import org.apache.accumulo.core.data.LoadPlanTest; import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; @@ -938,4 +945,151 @@ public void testMultipleFilesAndCache() throws Exception { assertEquals(testData, toMap(scanner)); scanner.close(); } + + @Test + public void testLoadPlanEmpty() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + LoadPlan.SplitResolver splitResolver = + LoadPlan.SplitResolver.from(new TreeSet<>(List.of(new Text("m")))); + + for (boolean withSplits : List.of(true, false)) { + String testFile = createTmpTestFile(); + var builder = RFile.newWriter().to(testFile).withFileSystem(localFs); + if (withSplits) { + builder = builder.withSplitResolver(splitResolver); + } + var writer = builder.build(); + + try (writer) { + writer.startDefaultLocalityGroup(); + } + var loadPlan = writer.getLoadPlan(new Path(testFile).getName()); + assertEquals(0, loadPlan.getDestinations().size()); + + loadPlan = LoadPlan.compute(new URI(testFile), splitResolver); + assertEquals(0, loadPlan.getDestinations().size()); + } + } + + @Test + public void testLoadPlanLocalityGroupsNoSplits() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build(); + try (writer) { + writer.startNewLocalityGroup("LG1", "F1"); + writer.append(new Key("001", "F1"), "V1"); + writer.append(new Key("005", "F1"), "V2"); + writer.startNewLocalityGroup("LG2", "F3"); + writer.append(new Key("003", "F3"), "V3"); + writer.append(new Key("004", "F3"), "V4"); + writer.startDefaultLocalityGroup(); + writer.append(new Key("007", "F4"), "V5"); + writer.append(new Key("009", "F4"), "V6"); + } + + var filename = new Path(testFile).getName(); + var loadPlan = writer.getLoadPlan(filename); + assertEquals(1, loadPlan.getDestinations().size()); + + // The minimum and maximum rows happend in different locality groups, the load plan should + // reflect this + var expectedLoadPlan = + LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, "001", "009").build(); + assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson()); + } + + @Test + public void testLoadPlanLocalityGroupsSplits() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + SortedSet splits = + Stream.of("001", "002", "003", "004", "005", "006", "007", "008", "009").map(Text::new) + .collect(Collectors.toCollection(TreeSet::new)); + var splitResolver = LoadPlan.SplitResolver.from(splits); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs) + .withSplitResolver(splitResolver).build(); + try (writer) { + writer.startNewLocalityGroup("LG1", "F1"); + writer.append(new Key("001", "F1"), "V1"); + writer.append(new Key("005", "F1"), "V2"); + writer.startNewLocalityGroup("LG2", "F3"); + writer.append(new Key("003", "F3"), "V3"); + writer.append(new Key("005", "F3"), "V3"); + writer.append(new Key("007", "F3"), "V4"); + writer.startDefaultLocalityGroup(); + writer.append(new Key("007", "F4"), "V5"); + writer.append(new Key("009", "F4"), "V6"); + } + + var filename = new Path(testFile).getName(); + var loadPlan = writer.getLoadPlan(filename); + assertEquals(5, loadPlan.getDestinations().size()); + + var builder = LoadPlan.builder(); + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, null, "001"); + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, "004", "005"); + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, "002", "003"); + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, "006", "007"); + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, "008", "009"); + assertEquals(LoadPlanTest.toString(builder.build().getDestinations()), + LoadPlanTest.toString(loadPlan.getDestinations())); + + loadPlan = LoadPlan.compute(new URI(testFile), splitResolver); + assertEquals(LoadPlanTest.toString(builder.build().getDestinations()), + LoadPlanTest.toString(loadPlan.getDestinations())); + } + + @Test + public void testIncorrectSplitResolver() throws Exception { + // for some rows the returns table splits will not contain the row. This should cause an error. + LoadPlan.SplitResolver splitResolver = + row -> new LoadPlan.TableSplits(new Text("003"), new Text("005")); + + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs) + .withSplitResolver(splitResolver).build(); + try (writer) { + writer.startDefaultLocalityGroup(); + writer.append(new Key("004", "F4"), "V2"); + var e = assertThrows(IllegalStateException.class, + () -> writer.append(new Key("007", "F4"), "V2")); + assertTrue(e.getMessage().contains("(003,005]")); + assertTrue(e.getMessage().contains("007")); + } + + var testFile2 = createTmpTestFile(); + var writer2 = RFile.newWriter().to(testFile2).withFileSystem(localFs).build(); + try (writer2) { + writer2.startDefaultLocalityGroup(); + writer2.append(new Key("004", "F4"), "V2"); + writer2.append(new Key("007", "F4"), "V2"); + } + + var e = assertThrows(IllegalStateException.class, + () -> LoadPlan.compute(new URI(testFile), splitResolver)); + assertTrue(e.getMessage().contains("(003,005]")); + assertTrue(e.getMessage().contains("007")); + } + + @Test + public void testGetLoadPlanBeforeClose() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build(); + try (writer) { + writer.startDefaultLocalityGroup(); + writer.append(new Key("004", "F4"), "V2"); + var e = assertThrows(IllegalStateException.class, + () -> writer.getLoadPlan(new Path(testFile).getName())); + assertEquals("Attempted to get load plan before closing", e.getMessage()); + } + } } diff --git a/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java b/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java index bba30a555d3..a2f9f63feea 100644 --- a/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java +++ b/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java @@ -23,8 +23,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import java.util.Base64; +import java.util.Collection; import java.util.HashSet; import java.util.Set; +import java.util.stream.Collectors; import org.apache.accumulo.core.data.LoadPlan.Destination; import org.apache.accumulo.core.data.LoadPlan.RangeType; @@ -100,6 +103,47 @@ public void testTypes() { assertEquals(expected, actual); + var loadPlan2 = LoadPlan.fromJson(loadPlan.toJson()); + Set actual2 = + loadPlan2.getDestinations().stream().map(LoadPlanTest::toString).collect(toSet()); + assertEquals(expected, actual2); + } + + @Test + public void testJson() { + var loadPlan = LoadPlan.builder().build(); + assertEquals(0, loadPlan.getDestinations().size()); + assertEquals("{\"destinations\":[]}", loadPlan.toJson()); + + var builder = LoadPlan.builder(); + builder.loadFileTo("f1.rf", RangeType.TABLE, null, "003"); + builder.loadFileTo("f2.rf", RangeType.FILE, "004", "007"); + builder.loadFileTo("f1.rf", RangeType.TABLE, "005", "006"); + builder.loadFileTo("f3.rf", RangeType.TABLE, "008", null); + String json = builder.build().toJson(); + + String b64003 = Base64.getUrlEncoder().encodeToString("003".getBytes(UTF_8)); + String b64004 = Base64.getUrlEncoder().encodeToString("004".getBytes(UTF_8)); + String b64005 = Base64.getUrlEncoder().encodeToString("005".getBytes(UTF_8)); + String b64006 = Base64.getUrlEncoder().encodeToString("006".getBytes(UTF_8)); + String b64007 = Base64.getUrlEncoder().encodeToString("007".getBytes(UTF_8)); + String b64008 = Base64.getUrlEncoder().encodeToString("008".getBytes(UTF_8)); + + String expected = "{'destinations':[{'fileName':'f1.rf','startRow':null,'endRow':'" + b64003 + + "','rangeType':'TABLE'}," + "{'fileName':'f2.rf','startRow':'" + b64004 + "','endRow':'" + + b64007 + "','rangeType':'FILE'}," + "{'fileName':'f1.rf','startRow':'" + b64005 + + "','endRow':'" + b64006 + "','rangeType':'TABLE'}," + "{'fileName':'f3.rf','startRow':'" + + b64008 + "','endRow':null,'rangeType':'TABLE'}]}"; + + assertEquals(expected.replace("'", "\""), json); + } + + @Test + public void testTableSplits() { + assertThrows(IllegalArgumentException.class, + () -> new LoadPlan.TableSplits(new Text("004"), new Text("004"))); + assertThrows(IllegalArgumentException.class, + () -> new LoadPlan.TableSplits(new Text("004"), new Text("003"))); } private static String toString(Destination d) { @@ -110,4 +154,8 @@ private static String toString(Destination d) { private static String toString(byte[] r) { return r == null ? null : new String(r, UTF_8); } + + public static Set toString(Collection destinations) { + return destinations.stream().map(d -> toString(d)).collect(Collectors.toSet()); + } } diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java index 81c76d58ff6..38b2b954342 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java @@ -207,7 +207,6 @@ static String formatString(String prefix, int i) { public void test1() throws IOException { // test an empty file - TestRFile trf = new TestRFile(conf); trf.openWriter(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java index ee9d418b68d..ee90d4841f0 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java @@ -30,6 +30,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.math.BigInteger; +import java.net.URI; import java.nio.file.Files; import java.nio.file.Paths; import java.security.MessageDigest; @@ -88,6 +89,7 @@ import org.apache.accumulo.server.constraints.MetadataConstraints; import org.apache.accumulo.server.constraints.SystemEnvironment; import org.apache.accumulo.test.util.Wait; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -471,6 +473,63 @@ public void testBadLoadPlans() throws Exception { } } + @Test + public void testComputeLoadPlan() throws Exception { + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + addSplits(c, tableName, "0333 0666 0999 1333 1666"); + + String dir = getDir("/testBulkFile-"); + + Map> hashes = new HashMap<>(); + String h1 = writeData(dir + "/f1.", aconf, 0, 333); + hashes.put("0333", new HashSet<>(List.of(h1))); + String h2 = writeData(dir + "/f2.", aconf, 0, 666); + hashes.get("0333").add(h2); + hashes.put("0666", new HashSet<>(List.of(h2))); + String h3 = writeData(dir + "/f3.", aconf, 334, 700); + hashes.get("0666").add(h3); + hashes.put("0999", new HashSet<>(List.of(h3))); + hashes.put("1333", Set.of()); + hashes.put("1666", Set.of()); + hashes.put("null", Set.of()); + + SortedSet splits = new TreeSet<>(c.tableOperations().listSplits(tableName)); + + for (String filename : List.of("f1.rf", "f2.rf", "f3.rf")) { + // The body of this loop simulates what each reducer would do + Path path = new Path(dir + "/" + filename); + + // compute the load plan for the rfile + URI file = path.toUri(); + String lpJson = LoadPlan.compute(file, LoadPlan.SplitResolver.from(splits)).toJson(); + + // save the load plan to a file + Path lpPath = new Path(path.getParent(), path.getName().replace(".rf", ".lp")); + try (var output = getCluster().getFileSystem().create(lpPath, false)) { + IOUtils.write(lpJson, output, UTF_8); + } + } + + // This simulates the code that would run after the map reduce job and bulk import the files + var builder = LoadPlan.builder(); + for (var status : getCluster().getFileSystem().listStatus(new Path(dir), + p -> p.getName().endsWith(".lp"))) { + try (var input = getCluster().getFileSystem().open(status.getPath())) { + String lpJson = IOUtils.toString(input, UTF_8); + builder.addPlan(LoadPlan.fromJson(lpJson)); + } + } + + LoadPlan lpAll = builder.build(); + + c.tableOperations().importDirectory(dir).to(tableName).plan(lpAll).load(); + + verifyData(c, tableName, 0, 700, false); + verifyMetadata(c, tableName, hashes); + } + } + @Test public void testEmptyDir() throws Exception { try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { @@ -616,7 +675,7 @@ private void verifyMetadata(AccumuloClient client, String tableName, String endRow = tablet.getEndRow() == null ? "null" : tablet.getEndRow().toString(); - assertEquals(expectedHashes.get(endRow), fileHashes); + assertEquals(expectedHashes.get(endRow), fileHashes, "endRow " + endRow); endRowsSeen.add(endRow); } From e61521f4d67c00813b2807de18b7600bfe9b338c Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 30 Sep 2024 18:00:43 +0000 Subject: [PATCH 2/6] fix build and bug --- .../core/client/rfile/LoadPlanCollector.java | 3 ++- .../accumulo/core/client/rfile/RFileWriter.java | 1 + .../org/apache/accumulo/core/data/LoadPlan.java | 6 ++++-- .../accumulo/core/client/rfile/RFileClientTest.java | 13 +++++++++++-- .../org/apache/accumulo/core/data/LoadPlanTest.java | 6 +++--- 5 files changed, 21 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java index 1ee4c8b7133..511e3fed51c 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java @@ -103,11 +103,12 @@ public void startLocalityGroup() { } public LoadPlan getLoadPlan(String filename) { + Preconditions.checkState(finished, "Attempted to get load plan before closing"); + if (appended == 0) { return LoadPlan.builder().build(); } - Preconditions.checkState(finished, "Attempted to get load plan before closing"); if (splitResolver == null) { return LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, firstRow, lastRow) .build(); diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java index 819dbb34d8d..230bbe3c369 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java @@ -273,6 +273,7 @@ public void close() throws IOException { * @return load plan computed from the keys written to the rfile. * @see org.apache.accumulo.core.client.rfile.RFile.WriterOptions#withSplitResolver(LoadPlan.SplitResolver) * @since 3.1.0 + * @throws IllegalStateException is attempting to get load plan before calling {@link #close()} */ public LoadPlan getLoadPlan(String filename) { return loadPlanCollector.getLoadPlan(filename); diff --git a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java index 6931dd4251f..d712bce32de 100644 --- a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java +++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java @@ -362,10 +362,12 @@ public Text getEndRow() { @Override public boolean equals(Object o) { - if (this == o) + if (this == o) { return true; - if (o == null || getClass() != o.getClass()) + } + if (o == null || getClass() != o.getClass()) { return false; + } TableSplits that = (TableSplits) o; return Objects.equals(prevRow, that.prevRow) && Objects.equals(endRow, that.endRow); } diff --git a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java index 276126c589e..8876e7c600f 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java @@ -961,8 +961,14 @@ public void testLoadPlanEmpty() throws Exception { } var writer = builder.build(); + // can not get load plan before closing file + assertThrows(IllegalStateException.class, + () -> writer.getLoadPlan(new Path(testFile).getName())); + try (writer) { writer.startDefaultLocalityGroup(); + assertThrows(IllegalStateException.class, + () -> writer.getLoadPlan(new Path(testFile).getName())); } var loadPlan = writer.getLoadPlan(new Path(testFile).getName()); assertEquals(0, loadPlan.getDestinations().size()); @@ -1085,11 +1091,14 @@ public void testGetLoadPlanBeforeClose() throws Exception { String testFile = createTmpTestFile(); var writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build(); try (writer) { - writer.startDefaultLocalityGroup(); - writer.append(new Key("004", "F4"), "V2"); var e = assertThrows(IllegalStateException.class, () -> writer.getLoadPlan(new Path(testFile).getName())); assertEquals("Attempted to get load plan before closing", e.getMessage()); + writer.startDefaultLocalityGroup(); + writer.append(new Key("004", "F4"), "V2"); + var e2 = assertThrows(IllegalStateException.class, + () -> writer.getLoadPlan(new Path(testFile).getName())); + assertEquals("Attempted to get load plan before closing", e2.getMessage()); } } } diff --git a/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java b/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java index a2f9f63feea..18f2038163d 100644 --- a/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java +++ b/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java @@ -130,9 +130,9 @@ public void testJson() { String b64008 = Base64.getUrlEncoder().encodeToString("008".getBytes(UTF_8)); String expected = "{'destinations':[{'fileName':'f1.rf','startRow':null,'endRow':'" + b64003 - + "','rangeType':'TABLE'}," + "{'fileName':'f2.rf','startRow':'" + b64004 + "','endRow':'" - + b64007 + "','rangeType':'FILE'}," + "{'fileName':'f1.rf','startRow':'" + b64005 - + "','endRow':'" + b64006 + "','rangeType':'TABLE'}," + "{'fileName':'f3.rf','startRow':'" + + "','rangeType':'TABLE'},{'fileName':'f2.rf','startRow':'" + b64004 + "','endRow':'" + + b64007 + "','rangeType':'FILE'},{'fileName':'f1.rf','startRow':'" + b64005 + + "','endRow':'" + b64006 + "','rangeType':'TABLE'},{'fileName':'f3.rf','startRow':'" + b64008 + "','endRow':null,'rangeType':'TABLE'}]}"; assertEquals(expected.replace("'", "\""), json); From 618cd6372b9894e3e5bbba061599a9ed1627cd10 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 30 Sep 2024 18:32:53 +0000 Subject: [PATCH 3/6] fix build and add validation --- .../accumulo/core/client/rfile/RFileWriter.java | 6 +++++- .../core/client/rfile/RFileClientTest.java | 14 ++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java index 230bbe3c369..e337d2c5b3d 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java @@ -269,13 +269,17 @@ public void close() throws IOException { * of type {@link org.apache.accumulo.core.data.LoadPlan.RangeType#TABLE} that has the split * ranges the rows written overlapped. * - * @param filename + * @param filename This file name will be used in the load plan and it should match the name that + * will be used when bulk importing this file. Only a filename is needed, not a full path. * @return load plan computed from the keys written to the rfile. * @see org.apache.accumulo.core.client.rfile.RFile.WriterOptions#withSplitResolver(LoadPlan.SplitResolver) * @since 3.1.0 * @throws IllegalStateException is attempting to get load plan before calling {@link #close()} + * @throws IllegalArgumentException is a full path is passed instead of a filename */ public LoadPlan getLoadPlan(String filename) { + Preconditions.checkArgument(!filename.contains("/"), + "Unexpected path %s seen instead of file name", filename); return loadPlanCollector.getLoadPlan(filename); } } diff --git a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java index 8876e7c600f..393511e8a26 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java @@ -1101,4 +1101,18 @@ public void testGetLoadPlanBeforeClose() throws Exception { assertEquals("Attempted to get load plan before closing", e2.getMessage()); } } + + @Test + public void testGetLoadPlanWithPath() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build(); + writer.close(); + + var e = + assertThrows(IllegalArgumentException.class, () -> writer.getLoadPlan(testFile.toString())); + assertTrue(e.getMessage().contains("Unexpected path")); + assertEquals(0, writer.getLoadPlan(new Path(testFile).getName()).getDestinations().size()); + } } From 16b351a0dbf55013f7acecfe4d3bda7cd476cca1 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 30 Sep 2024 18:50:25 +0000 Subject: [PATCH 4/6] fix javadoc bug --- .../src/main/java/org/apache/accumulo/core/data/LoadPlan.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java index d712bce32de..56b62cb85b3 100644 --- a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java +++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java @@ -405,8 +405,8 @@ static SplitResolver from(SortedSet splits) { /** * For a given row R this function should find two split points S1 and S2 that exist in the - * table being bulk imported to such that S1 < R <= S2. The closer S1 and S2 are to each other - * the better. + * table being bulk imported to such that S1 < R <= S2. The closer S1 and S2 are to each + * other the better. */ @Override TableSplits apply(Text row); From 8cc15ad6f95d519d67a50559212605210422ebae Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Mon, 30 Sep 2024 19:00:29 +0000 Subject: [PATCH 5/6] improve javadoc --- .../main/java/org/apache/accumulo/core/data/LoadPlan.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java index 56b62cb85b3..1f99d1451a2 100644 --- a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java +++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java @@ -412,6 +412,12 @@ static SplitResolver from(SortedSet splits) { TableSplits apply(Text row); } + /** + * Computes a load plan for a given rfile. This will open the rfile and find every + * {@link TableSplits} that overlaps rows in the file and add those to the returned load plan. + * + * @since 3.1.0 + */ public static LoadPlan compute(URI file, SplitResolver splitResolver) throws IOException { return compute(file, Map.of(), splitResolver); } @@ -420,6 +426,8 @@ public static LoadPlan compute(URI file, SplitResolver splitResolver) throws IOE * Computes a load plan for a given rfile. This will open the rfile and find every * {@link TableSplits} that overlaps rows in the file and add those to the returned load plan. * + * @param properties used when opening the rfile, see + * {@link org.apache.accumulo.core.client.rfile.RFile.ScannerOptions#withTableProperties(Map)} * @since 3.1.0 */ public static LoadPlan compute(URI file, Map properties, From 964188ba39b5ee568efecc0dd3f44842284bbadd Mon Sep 17 00:00:00 2001 From: Daniel Roberts ddanielr Date: Wed, 30 Oct 2024 15:07:38 +0000 Subject: [PATCH 6/6] Add new prefix for bulk load working files --- core/src/main/java/org/apache/accumulo/core/Constants.java | 1 + .../apache/accumulo/core/clientImpl/bulk/BulkImport.java | 2 +- .../java/org/apache/accumulo/core/file/FileOperations.java | 7 +++++++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index 411af753cfe..f6ff8554258 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -97,6 +97,7 @@ public class Constants { public static final String BULK_PREFIX = "b-"; public static final String BULK_RENAME_FILE = "renames.json"; public static final String BULK_LOAD_MAPPING = "loadmap.json"; + public static final String BULK_WORKING_PREFIX = "accumulo-bulk-"; public static final String CLONE_PREFIX = "c-"; public static final byte[] CLONE_PREFIX_BYTES = CLONE_PREFIX.getBytes(UTF_8); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java index d85c19ac548..899dfba13e2 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java @@ -516,7 +516,7 @@ public static List filterInvalid(FileStatus[] files) { continue; } - if (FileOperations.getBulkWorkingFiles().contains(fname)) { + if (FileOperations.isBulkWorkingFile(fname)) { log.debug("{} is an internal working file, ignoring.", fileStatus.getPath()); continue; } diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java index 5182c614aa8..6a7daefed89 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java @@ -55,6 +55,13 @@ public abstract class FileOperations { Set.of(Constants.BULK_LOAD_MAPPING, Constants.BULK_RENAME_FILE, FileOutputCommitter.SUCCEEDED_FILE_NAME, HADOOP_JOBHISTORY_LOCATION); + public static boolean isBulkWorkingFile(String fileName) { + if (fileName.startsWith(Constants.BULK_WORKING_PREFIX)) { + return true; + } + return bulkWorkingFiles.contains(fileName); + } + public static Set getValidExtensions() { return validExtensions; }