Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offers new ways to compute bulk import load plans. #4933

Open
wants to merge 6 commits into
base: 3.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/accumulo/core/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class Constants {
public static final String BULK_PREFIX = "b-";
public static final String BULK_RENAME_FILE = "renames.json";
public static final String BULK_LOAD_MAPPING = "loadmap.json";
public static final String BULK_WORKING_PREFIX = "accumulo-bulk-";

public static final String CLONE_PREFIX = "c-";
public static final byte[] CLONE_PREFIX_BYTES = CLONE_PREFIX.getBytes(UTF_8);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.client.rfile;

import java.util.HashSet;
import java.util.Set;

import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.hadoop.io.Text;

import com.google.common.base.Preconditions;

class LoadPlanCollector {

private final LoadPlan.SplitResolver splitResolver;
private boolean finished = false;
private Text lgFirstRow;
private Text lgLastRow;
private Text firstRow;
private Text lastRow;
private Set<KeyExtent> overlappingExtents;
private KeyExtent currentExtent;
private long appended = 0;

LoadPlanCollector(LoadPlan.SplitResolver splitResolver) {
this.splitResolver = splitResolver;
this.overlappingExtents = new HashSet<>();
}

LoadPlanCollector() {
splitResolver = null;
this.overlappingExtents = null;

}

private void appendNoSplits(Key key) {
if (lgFirstRow == null) {
lgFirstRow = key.getRow();
lgLastRow = lgFirstRow;
} else {
var row = key.getRow();
lgLastRow = row;
}
}

private static final TableId FAKE_ID = TableId.of("123");

private void appendSplits(Key key) {
var row = key.getRow();
if (currentExtent == null || !currentExtent.contains(row)) {
var tableSplits = splitResolver.apply(row);
var extent = new KeyExtent(FAKE_ID, tableSplits.getEndRow(), tableSplits.getPrevRow());
Preconditions.checkState(extent.contains(row), "%s does not contain %s", tableSplits, row);
if (currentExtent != null) {
overlappingExtents.add(currentExtent);
}
currentExtent = extent;
}
}

public void append(Key key) {
if (splitResolver == null) {
appendNoSplits(key);
} else {
appendSplits(key);
}
appended++;
}

public void startLocalityGroup() {
if (lgFirstRow != null) {
if (firstRow == null) {
firstRow = lgFirstRow;
lastRow = lgLastRow;
} else {
// take the minimum
firstRow = firstRow.compareTo(lgFirstRow) < 0 ? firstRow : lgFirstRow;
// take the maximum
lastRow = lastRow.compareTo(lgLastRow) > 0 ? lastRow : lgLastRow;
}
lgFirstRow = null;
lgLastRow = null;
}
}

public LoadPlan getLoadPlan(String filename) {
Preconditions.checkState(finished, "Attempted to get load plan before closing");

if (appended == 0) {
return LoadPlan.builder().build();
}

if (splitResolver == null) {
return LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, firstRow, lastRow)
.build();
} else {
var builder = LoadPlan.builder();
overlappingExtents.add(currentExtent);
for (var extent : overlappingExtents) {
builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, extent.prevEndRow(),
extent.endRow());
}
return builder.build();
}
}

public void close() {
finished = true;
// compute the overall min and max rows
startLocalityGroup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.accumulo.core.client.summary.Summary.FileStatistics;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.RowRangeUtil;
Expand Down Expand Up @@ -463,6 +464,15 @@ default WriterOptions withSummarizers(SummarizerConfiguration... summarizerConf)
*/
WriterOptions withVisibilityCacheSize(int maxSize);

/**
* @param splitResolver builds a {@link LoadPlan} using table split points provided by the given
* splitResolver.
* @return this
* @see RFileWriter#getLoadPlan(String)
* @since 3.1.0
*/
WriterOptions withSplitResolver(LoadPlan.SplitResolver splitResolver);

/**
* @return a new RfileWriter created with the options previously specified.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.security.ColumnVisibility;
Expand Down Expand Up @@ -92,12 +93,17 @@ public class RFileWriter implements AutoCloseable {

private final FileSKVWriter writer;
private final LRUMap<ByteSequence,Boolean> validVisibilities;

// TODO should be able to completely remove this as lower level code is already doing some things
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove this todo and open an issue before this PR is merged.

// like tracking first and last keys per LG. Added to get simple initial impl before optimizing.
private final LoadPlanCollector loadPlanCollector;
private boolean startedLG;
private boolean startedDefaultLG;

RFileWriter(FileSKVWriter fileSKVWriter, int visCacheSize) {
RFileWriter(FileSKVWriter fileSKVWriter, int visCacheSize, LoadPlanCollector loadPlanCollector) {
this.writer = fileSKVWriter;
this.validVisibilities = new LRUMap<>(visCacheSize);
this.loadPlanCollector = loadPlanCollector;
}

private void _startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies)
Expand All @@ -106,6 +112,7 @@ private void _startNewLocalityGroup(String name, Set<ByteSequence> columnFamilie
"Cannot start a locality group after starting the default locality group");
writer.startNewLocalityGroup(name, columnFamilies);
startedLG = true;
loadPlanCollector.startLocalityGroup();
}

/**
Expand Down Expand Up @@ -175,6 +182,7 @@ public void startNewLocalityGroup(String name, String... families) throws IOExce

public void startDefaultLocalityGroup() throws IOException {
Preconditions.checkState(!startedDefaultLG);
loadPlanCollector.startLocalityGroup();
writer.startDefaultLocalityGroup();
startedDefaultLG = true;
startedLG = true;
Expand Down Expand Up @@ -204,6 +212,7 @@ public void append(Key key, Value val) throws IOException {
validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, cv.length)), Boolean.TRUE);
}
writer.append(key, val);
loadPlanCollector.append(key);
}

/**
Expand Down Expand Up @@ -250,5 +259,27 @@ public void append(Iterable<Entry<Key,Value>> keyValues) throws IOException {
@Override
public void close() throws IOException {
writer.close();
loadPlanCollector.close();
}

/**
* If no split resolver was provided when the RFileWriter was built then this method will return a
* simple load plan of type {@link org.apache.accumulo.core.data.LoadPlan.RangeType#FILE} using
* the first and last row seen. If a splitResolver was provided then this will return a load plan
* of type {@link org.apache.accumulo.core.data.LoadPlan.RangeType#TABLE} that has the split
* ranges the rows written overlapped.
*
* @param filename This file name will be used in the load plan and it should match the name that
* will be used when bulk importing this file. Only a filename is needed, not a full path.
* @return load plan computed from the keys written to the rfile.
* @see org.apache.accumulo.core.client.rfile.RFile.WriterOptions#withSplitResolver(LoadPlan.SplitResolver)
* @since 3.1.0
* @throws IllegalStateException is attempting to get load plan before calling {@link #close()}
* @throws IllegalArgumentException is a full path is passed instead of a filename
*/
public LoadPlan getLoadPlan(String filename) {
Preconditions.checkArgument(!filename.contains("/"),
"Unexpected path %s seen instead of file name", filename);
return loadPlanCollector.getLoadPlan(filename);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.crypto.CryptoFactoryLoader;
import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.metadata.UnreferencedTabletFile;
import org.apache.accumulo.core.metadata.ValidationUtil;
Expand Down Expand Up @@ -73,6 +74,7 @@ OutputStream getOutputStream() {
private int visCacheSize = 1000;
private Map<String,String> samplerProps = Collections.emptyMap();
private Map<String,String> summarizerProps = Collections.emptyMap();
private LoadPlan.SplitResolver splitResolver;

private void checkDisjoint(Map<String,String> props, Map<String,String> derivedProps,
String kind) {
Expand Down Expand Up @@ -107,6 +109,13 @@ public RFileWriter build() throws IOException {
CryptoService cs =
CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, tableConfig);

LoadPlanCollector loadPlanCollector;
if (splitResolver != null) {
loadPlanCollector = new LoadPlanCollector(splitResolver);
} else {
loadPlanCollector = new LoadPlanCollector();
}

if (out.getOutputStream() != null) {
FSDataOutputStream fsdo;
if (out.getOutputStream() instanceof FSDataOutputStream) {
Expand All @@ -117,12 +126,13 @@ public RFileWriter build() throws IOException {
return new RFileWriter(
fileops.newWriterBuilder().forOutputStream(".rf", fsdo, out.getConf(), cs)
.withTableConfiguration(acuconf).withStartDisabled().build(),
visCacheSize);
visCacheSize, loadPlanCollector);
} else {
return new RFileWriter(fileops.newWriterBuilder()
.forFile(UnreferencedTabletFile.of(out.getFileSystem(), out.path), out.getFileSystem(),
out.getConf(), cs)
.withTableConfiguration(acuconf).withStartDisabled().build(), visCacheSize);
.withTableConfiguration(acuconf).withStartDisabled().build(), visCacheSize,
loadPlanCollector);
}
}

Expand Down Expand Up @@ -174,6 +184,12 @@ public WriterOptions withVisibilityCacheSize(int maxSize) {
return this;
}

@Override
public WriterOptions withSplitResolver(LoadPlan.SplitResolver splitResolver) {
this.splitResolver = splitResolver;
return this;
}

@Override
public WriterOptions withSummarizers(SummarizerConfiguration... summarizerConf) {
Objects.requireNonNull(summarizerConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Stream;

import org.apache.accumulo.core.Constants;
Expand Down Expand Up @@ -313,19 +314,25 @@ public interface KeyExtentCache {
KeyExtent lookup(Text row);
}

public static List<KeyExtent> findOverlappingTablets(KeyExtentCache extentCache,
FileSKVIterator reader) throws IOException {
/**
* Function that will find a row in a file being bulk imported that is >= the row passed to the
* function. If there is no row then it should return null.
*/
public interface NextRowFunction {
Text apply(Text row) throws IOException;
}

public static List<KeyExtent> findOverlappingTablets(Function<Text,KeyExtent> rowToExtentResolver,
NextRowFunction nextRowFunction) throws IOException {

List<KeyExtent> result = new ArrayList<>();
Collection<ByteSequence> columnFamilies = Collections.emptyList();
Text row = new Text();
while (true) {
reader.seek(new Range(row, null), columnFamilies, false);
if (!reader.hasTop()) {
row = nextRowFunction.apply(row);
if (row == null) {
break;
}
row = reader.getTopKey().getRow();
KeyExtent extent = extentCache.lookup(row);
KeyExtent extent = rowToExtentResolver.apply(row);
result.add(extent);
row = extent.endRow();
if (row != null) {
Expand All @@ -345,12 +352,22 @@ private static Text nextRow(Text row) {
}

public static List<KeyExtent> findOverlappingTablets(ClientContext context,
KeyExtentCache extentCache, UnreferencedTabletFile file, FileSystem fs,
KeyExtentCache keyExtentCache, UnreferencedTabletFile file, FileSystem fs,
Cache<String,Long> fileLenCache, CryptoService cs) throws IOException {
try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder()
.forFile(file, fs, fs.getConf(), cs).withTableConfiguration(context.getConfiguration())
.withFileLenCache(fileLenCache).seekToBeginning().build()) {
return findOverlappingTablets(extentCache, reader);

Collection<ByteSequence> columnFamilies = Collections.emptyList();
NextRowFunction nextRowFunction = row -> {
reader.seek(new Range(row, null), columnFamilies, false);
if (!reader.hasTop()) {
return null;
}
return reader.getTopKey().getRow();
};

return findOverlappingTablets(keyExtentCache::lookup, nextRowFunction);
}
}

Expand Down Expand Up @@ -499,7 +516,7 @@ public static List<FileStatus> filterInvalid(FileStatus[] files) {
continue;
}

if (FileOperations.getBulkWorkingFiles().contains(fname)) {
if (FileOperations.isBulkWorkingFile(fname)) {
log.debug("{} is an internal working file, ignoring.", fileStatus.getPath());
continue;
}
Expand Down
Loading