Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prototype of bulk import v2 distributed file examination #4898

Draft
wants to merge 22 commits into
base: 2.1
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
aa593ad
prototype of bulk import v2 distributed file examination
keith-turner Sep 17, 2024
407358a
format code
keith-turner Sep 17, 2024
fd70d34
fix build
keith-turner Sep 17, 2024
058328a
remove tight coupling to SortedSet by adding simple indirection
keith-turner Sep 18, 2024
c8c5f21
format code
keith-turner Sep 18, 2024
9ef0bcf
use Rfile api to read
keith-turner Sep 18, 2024
368b2a4
adds cache
keith-turner Sep 19, 2024
e228b68
adds ability to constuct load plan while writing to rfile
keith-turner Sep 26, 2024
9328277
adds tests and javadoc
keith-turner Sep 27, 2024
3190d19
fail build when local mods
keith-turner Sep 27, 2024
174b4e0
update pom for including sha in version
keith-turner Sep 27, 2024
f82d111
revert pom change
keith-turner Sep 27, 2024
9c7dc66
Revert "update pom for including sha in version"
keith-turner Sep 30, 2024
4285753
cleanup
keith-turner Sep 30, 2024
ec0febb
more cleanup
keith-turner Sep 30, 2024
97e4684
fix validation bug
keith-turner Sep 30, 2024
aabe2d8
Update core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java
keith-turner Oct 1, 2024
2003eae
Update core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.…
keith-turner Oct 1, 2024
667f12e
Update core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.…
keith-turner Oct 1, 2024
a5ead55
Update core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.…
keith-turner Oct 1, 2024
926dec7
sync w/ 3.1 changes
keith-turner Oct 1, 2024
235945b
Add new prefix for bulk load working files
ddanielr Oct 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.client.rfile;

import java.util.HashSet;
import java.util.Set;

import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.hadoop.io.Text;

import com.google.common.base.Preconditions;

class LoadPlanCollector {

private final LoadPlan.SplitResolver splitResolver;
private boolean finished = false;
private Text lgFirstRow;
private Text lgLastRow;
private Text firstRow;
private Text lastRow;
private Set<KeyExtent> overlappingExtents;
private KeyExtent currentExtent;
private long appended = 0;

LoadPlanCollector(LoadPlan.SplitResolver splitResolver) {
this.splitResolver = splitResolver;
this.overlappingExtents = new HashSet<>();
}

LoadPlanCollector() {
splitResolver = null;
this.overlappingExtents = null;

}

private void appendNoSplits(Key key) {
if (lgFirstRow == null) {
lgFirstRow = key.getRow();
lgLastRow = lgFirstRow;
} else {
var row = key.getRow();
lgLastRow = row;
}
}

private static final TableId FAKE_ID = TableId.of("123");

private void appendSplits(Key key) {
var row = key.getRow();
if (currentExtent == null || !currentExtent.contains(row)) {
var tableSplits = splitResolver.apply(row);
var extent = new KeyExtent(FAKE_ID, tableSplits.getEndRow(), tableSplits.getPrevRow());
Preconditions.checkState(extent.contains(row), "%s does not contain %s", tableSplits, row);
if (currentExtent != null) {
overlappingExtents.add(currentExtent);
}
currentExtent = extent;
}
}

public void append(Key key) {
if (splitResolver == null) {
appendNoSplits(key);
} else {
appendSplits(key);
}
appended++;
}

public void startLocalityGroup() {
if (lgFirstRow != null) {
if (firstRow == null) {
firstRow = lgFirstRow;
lastRow = lgLastRow;
} else {
// take the minimum
firstRow = firstRow.compareTo(lgFirstRow) < 0 ? firstRow : lgFirstRow;
// take the maximum
lastRow = lastRow.compareTo(lgLastRow) > 0 ? lastRow : lgLastRow;
}
lgFirstRow = null;
lgLastRow = null;
}
}

public LoadPlan getLoadPlan(String filename) {
Preconditions.checkState(finished, "Attempted to get load plan before closing");

if (appended == 0) {
return LoadPlan.builder().build();
}

if (splitResolver == null) {
return LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, firstRow, lastRow)
.build();
} else {
var builder = LoadPlan.builder();
overlappingExtents.add(currentExtent);
for (var extent : overlappingExtents) {
builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, extent.prevEndRow(),
extent.endRow());
}
return builder.build();
}
}

public void close() {
finished = true;
// compute the overall min and max rows
startLocalityGroup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.accumulo.core.client.summary.Summary.FileStatistics;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -428,6 +429,15 @@ default WriterOptions withSummarizers(SummarizerConfiguration... summarizerConf)
*/
WriterOptions withVisibilityCacheSize(int maxSize);

/**
* @param splitResolver builds a {@link LoadPlan} using table split points provided by the given
* splitResolver.
* @return this
* @see RFileWriter#getLoadPlan(String)
* @since 2.1.4
*/
WriterOptions withSplitResolver(LoadPlan.SplitResolver splitResolver);

/**
* @return a new RfileWriter created with the options previously specified.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.security.ColumnVisibility;
Expand Down Expand Up @@ -92,12 +93,17 @@ public class RFileWriter implements AutoCloseable {

private final FileSKVWriter writer;
private final LRUMap<ByteSequence,Boolean> validVisibilities;

// TODO should be able to completely remove this as lower level code is already doing some things
// like tracking first and last keys per LG. Added to get simple initial impl before optimizing.
private final LoadPlanCollector loadPlanCollector;
private boolean startedLG;
private boolean startedDefaultLG;

RFileWriter(FileSKVWriter fileSKVWriter, int visCacheSize) {
RFileWriter(FileSKVWriter fileSKVWriter, int visCacheSize, LoadPlanCollector loadPlanCollector) {
this.writer = fileSKVWriter;
this.validVisibilities = new LRUMap<>(visCacheSize);
this.loadPlanCollector = loadPlanCollector;
}

private void _startNewLocalityGroup(String name, Set<ByteSequence> columnFamilies)
Expand All @@ -106,6 +112,7 @@ private void _startNewLocalityGroup(String name, Set<ByteSequence> columnFamilie
"Cannot start a locality group after starting the default locality group");
writer.startNewLocalityGroup(name, columnFamilies);
startedLG = true;
loadPlanCollector.startLocalityGroup();
}

/**
Expand Down Expand Up @@ -175,6 +182,7 @@ public void startNewLocalityGroup(String name, String... families) throws IOExce

public void startDefaultLocalityGroup() throws IOException {
Preconditions.checkState(!startedDefaultLG);
loadPlanCollector.startLocalityGroup();
writer.startDefaultLocalityGroup();
startedDefaultLG = true;
startedLG = true;
Expand Down Expand Up @@ -204,6 +212,7 @@ public void append(Key key, Value val) throws IOException {
validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, cv.length)), Boolean.TRUE);
}
writer.append(key, val);
loadPlanCollector.append(key);
}

/**
Expand Down Expand Up @@ -250,5 +259,22 @@ public void append(Iterable<Entry<Key,Value>> keyValues) throws IOException {
@Override
public void close() throws IOException {
writer.close();
loadPlanCollector.close();
}

/**
* If no split resolver was provided when the RFileWriter was built then this method will return a
* simple load plan of type {@link org.apache.accumulo.core.data.LoadPlan.RangeType#FILE} using
* the first and last row seen. If a splitResolver was provided then this will return a load plan
* of type {@link org.apache.accumulo.core.data.LoadPlan.RangeType#TABLE} that has the split
* ranges the rows written overlapped.
*
* @param filename
ddanielr marked this conversation as resolved.
Show resolved Hide resolved
* @return load plan computed from the keys written to the rfile.
* @see org.apache.accumulo.core.client.rfile.RFile.WriterOptions#withSplitResolver(LoadPlan.SplitResolver)
* @since 2.1.4
*/
public LoadPlan getLoadPlan(String filename) {
return loadPlanCollector.getLoadPlan(filename);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.crypto.CryptoFactoryLoader;
import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.metadata.ValidationUtil;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
Expand Down Expand Up @@ -72,6 +73,7 @@ OutputStream getOutputStream() {
private int visCacheSize = 1000;
private Map<String,String> samplerProps = Collections.emptyMap();
private Map<String,String> summarizerProps = Collections.emptyMap();
private LoadPlan.SplitResolver splitResolver;

private void checkDisjoint(Map<String,String> props, Map<String,String> derivedProps,
String kind) {
Expand Down Expand Up @@ -106,6 +108,13 @@ public RFileWriter build() throws IOException {
CryptoService cs =
CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, tableConfig);

LoadPlanCollector loadPlanCollector;
if (splitResolver != null) {
loadPlanCollector = new LoadPlanCollector(splitResolver);
} else {
loadPlanCollector = new LoadPlanCollector();
}

if (out.getOutputStream() != null) {
FSDataOutputStream fsdo;
if (out.getOutputStream() instanceof FSDataOutputStream) {
Expand All @@ -116,11 +125,13 @@ public RFileWriter build() throws IOException {
return new RFileWriter(
fileops.newWriterBuilder().forOutputStream(".rf", fsdo, out.getConf(), cs)
.withTableConfiguration(acuconf).withStartDisabled().build(),
visCacheSize);
visCacheSize, loadPlanCollector);
} else {
return new RFileWriter(fileops.newWriterBuilder()
.forFile(out.path.toString(), out.getFileSystem(), out.getConf(), cs)
.withTableConfiguration(acuconf).withStartDisabled().build(), visCacheSize);
return new RFileWriter(
fileops.newWriterBuilder()
.forFile(out.path.toString(), out.getFileSystem(), out.getConf(), cs)
.withTableConfiguration(acuconf).withStartDisabled().build(),
visCacheSize, loadPlanCollector);
}
}

Expand Down Expand Up @@ -172,6 +183,12 @@ public WriterOptions withVisibilityCacheSize(int maxSize) {
return this;
}

@Override
public WriterOptions withSplitResolver(LoadPlan.SplitResolver splitResolver) {
this.splitResolver = splitResolver;
return this;
}

@Override
public WriterOptions withSummarizers(SummarizerConfiguration... summarizerConf) {
Objects.requireNonNull(summarizerConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Stream;

import org.apache.accumulo.core.Constants;
Expand Down Expand Up @@ -324,19 +325,25 @@ public interface KeyExtentCache {
KeyExtent lookup(Text row);
}

public static List<KeyExtent> findOverlappingTablets(KeyExtentCache extentCache,
FileSKVIterator reader) throws IOException {
/**
* Function that will find a row in a file being bulk imported that is >= the row passed to the
* function. If there is no row then it should return null.
*/
public interface NextRowFunction {
Text apply(Text row) throws IOException;
}

public static List<KeyExtent> findOverlappingTablets(Function<Text,KeyExtent> rowToExtentResolver,
NextRowFunction nextRowFunction) throws IOException {

List<KeyExtent> result = new ArrayList<>();
Collection<ByteSequence> columnFamilies = Collections.emptyList();
Text row = new Text();
while (true) {
reader.seek(new Range(row, null), columnFamilies, false);
if (!reader.hasTop()) {
row = nextRowFunction.apply(row);
if (row == null) {
break;
}
row = reader.getTopKey().getRow();
KeyExtent extent = extentCache.lookup(row);
KeyExtent extent = rowToExtentResolver.apply(row);
result.add(extent);
row = extent.endRow();
if (row != null) {
Expand All @@ -356,13 +363,23 @@ private static Text nextRow(Text row) {
}

public static List<KeyExtent> findOverlappingTablets(ClientContext context,
KeyExtentCache extentCache, Path file, FileSystem fs, Cache<String,Long> fileLenCache,
KeyExtentCache keyExtentCache, Path file, FileSystem fs, Cache<String,Long> fileLenCache,
CryptoService cs) throws IOException {
try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder()
.forFile(file.toString(), fs, fs.getConf(), cs)
.withTableConfiguration(context.getConfiguration()).withFileLenCache(fileLenCache)
.seekToBeginning().build()) {
return findOverlappingTablets(extentCache, reader);

Collection<ByteSequence> columnFamilies = Collections.emptyList();
NextRowFunction nextRowFunction = row -> {
reader.seek(new Range(row, null), columnFamilies, false);
if (!reader.hasTop()) {
return null;
}
return reader.getTopKey().getRow();
};

return findOverlappingTablets(keyExtentCache::lookup, nextRowFunction);
}
}

Expand Down
Loading
Loading