diff --git a/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphBaseTest.java b/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphBaseTest.java index a69af6d185..e5cf614628 100644 --- a/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphBaseTest.java +++ b/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphBaseTest.java @@ -747,7 +747,7 @@ public static void evaluateQuery(JanusGraphQuery query, ElementCategory resultTy } protected ScanMetrics executeScanJob(VertexScanJob job) throws Exception { - return executeScanJob(VertexJobConverter.convert(graph,job)); + return executeScanJob(VertexJobConverter.convert(graph, job, null)); } protected ScanMetrics executeScanJob(ScanJob job) throws Exception { diff --git a/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphIndexTest.java b/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphIndexTest.java index d66707855d..1fbfe6fb36 100644 --- a/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphIndexTest.java +++ b/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphIndexTest.java @@ -1451,6 +1451,88 @@ public void testCompositeVsMixedIndexing() { assertTrue(tx.traversal().V().has("intId2", 234).hasNext()); } + @Test + public void testSubsetReindex() throws Exception { + + clopen(option(FORCE_INDEX_USAGE), true); + + mgmt.makeVertexLabel("cat").make(); + mgmt.makeVertexLabel("dog").make(); + + makeKey("id", Integer.class); + makeKey("name", String.class); + final PropertyKey typeKey = makeKey("type", String.class); + + String typeIndex = "searchByType"; + mgmt.buildIndex(typeIndex, Vertex.class) + .addKey(typeKey) + .buildCompositeIndex(); + mgmt.commit(); + + //Cats + int catsCount = 3; + for (int i = 0; i < catsCount; i++) { + Vertex v = tx.addVertex("cat"); + v.property("id", i); + v.property("name", "cat_" + i); + v.property("type", "cat"); + } + + //Dogs + for (int i = 0; i < 5; i++) { + Vertex v = tx.addVertex("dog"); + v.property("id", i); + v.property("name", "dog_" + i); + v.property("type", "dog"); + } + + tx.commit(); + + //Select a subset of vertices to index + clopen(option(FORCE_INDEX_USAGE), true); + List cats = tx.traversal().V().has("type", "cat").toList(); + assertEquals(catsCount, cats.size()); + + List dogs = tx.traversal().V().has("type", "dog").toList(); + assertEquals(5, dogs.size()); + tx.rollback(); + + //Create new Index + graph.getOpenTransactions().forEach(JanusGraphTransaction::rollback); + mgmt = graph.openManagement(); + mgmt.getOpenInstances().stream().filter(i -> !i.contains("current")).forEach(i -> mgmt.forceCloseInstance(i)); + mgmt.commit(); + + String catsNameIndex = "searchByName_CatsOnly"; + mgmt = graph.openManagement(); + mgmt.buildIndex(catsNameIndex, Vertex.class) + .addKey(mgmt.getPropertyKey("name")) + .indexOnly(mgmt.getVertexLabel("cat")) + .buildCompositeIndex(); + mgmt.commit(); + + //Make Index as REGISTERED + mgmt = graph.openManagement(); + mgmt.updateIndex(mgmt.getGraphIndex(catsNameIndex), SchemaAction.REGISTER_INDEX).get(); + mgmt.commit(); + ManagementSystem.awaitGraphIndexStatus(graph, catsNameIndex).status(SchemaStatus.REGISTERED).call(); + + //Reindex a given subset + mgmt = graph.openManagement(); + mgmt.updateIndex(mgmt.getGraphIndex(catsNameIndex), SchemaAction.REINDEX, + cats.stream().map(Element::id).collect(Collectors.toList())).get(); + mgmt.commit(); + ManagementSystem.awaitGraphIndexStatus(graph, catsNameIndex).status(SchemaStatus.ENABLED).call(); + + clopen(option(FORCE_INDEX_USAGE), true); + + for (int i = 0; i < catsCount; i++) { + List catsByName = tx.traversal().V().hasLabel("cat").has("name", "cat_" + i).toList(); + assertEquals(1, catsByName.size()); + } + tx.rollback(); + } + @Test public void testIndexInlineProperties() throws NoSuchMethodException { diff --git a/janusgraph-core/src/main/java/org/janusgraph/core/schema/JanusGraphManagement.java b/janusgraph-core/src/main/java/org/janusgraph/core/schema/JanusGraphManagement.java index a2fc563e89..5da96a3751 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/core/schema/JanusGraphManagement.java +++ b/janusgraph-core/src/main/java/org/janusgraph/core/schema/JanusGraphManagement.java @@ -25,6 +25,7 @@ import org.janusgraph.diskstorage.keycolumnvalue.scan.ScanJobFuture; import java.time.Duration; +import java.util.List; import java.util.Set; /** @@ -341,6 +342,17 @@ interface IndexBuilder { */ ScanJobFuture updateIndex(Index index, SchemaAction updateAction, int numOfThreads); + /** + * Updates the provided index according to the given {@link SchemaAction} for + * the given subset of vertices. + * + * @param index + * @param updateAction + * @param vertexOnly Set of vertexIds that only should be considered for index update + * @return a future that completes when the index action is done + */ + ScanJobFuture updateIndex(Index index, SchemaAction updateAction, List vertexOnly); + /** * If an index update job was triggered through {@link #updateIndex(Index, SchemaAction)} with schema actions * {@link org.janusgraph.core.schema.SchemaAction#REINDEX} or {@link org.janusgraph.core.schema.SchemaAction#DISCARD_INDEX} diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/common/DistributedStoreManager.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/common/DistributedStoreManager.java index 0896d8d3cd..89ed24d547 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/common/DistributedStoreManager.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/common/DistributedStoreManager.java @@ -28,6 +28,7 @@ import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.AUTH_PASSWORD; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.AUTH_USERNAME; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.CONNECTION_TIMEOUT; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.KEYS_SIZE; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.PAGE_SIZE; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_HOSTS; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_PORT; @@ -69,6 +70,7 @@ public enum Deployment { protected final int port; protected final Duration connectionTimeoutMS; protected final int pageSize; + protected final int keysSize; protected final String username; protected final String password; @@ -83,6 +85,7 @@ public DistributedStoreManager(Configuration storageConfig, int portDefault) { else this.port = portDefault; this.connectionTimeoutMS = storageConfig.get(CONNECTION_TIMEOUT); this.pageSize = storageConfig.get(PAGE_SIZE); + this.keysSize = storageConfig.get(KEYS_SIZE); this.times = storageConfig.get(TIMESTAMP_PROVIDER); if (storageConfig.has(AUTH_USERNAME)) { @@ -121,6 +124,15 @@ public int getPageSize() { return pageSize; } + /** + * Returns the default configured keys size for this storage backend. The keys size is used to determine + * how many keys/partitions to request from storage within single request. + * @return + */ + public int getKeysSize() { + return keysSize; + } + /* * TODO this should go away once we have a JanusGraphConfig that encapsulates TimestampProvider */ diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/KeyColumnValueStore.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/KeyColumnValueStore.java index 4973433406..6ec14ea850 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/KeyColumnValueStore.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/KeyColumnValueStore.java @@ -14,6 +14,7 @@ package org.janusgraph.diskstorage.keycolumnvalue; +import org.apache.commons.lang.NotImplementedException; import org.janusgraph.diskstorage.BackendException; import org.janusgraph.diskstorage.Entry; import org.janusgraph.diskstorage.EntryList; @@ -181,6 +182,10 @@ default Map> getMultiSlices(MultiKeysQu */ void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException; + default KeyIterator getKeys(final List keys, final SliceQuery query, final StoreTransaction txh) throws BackendException { + throw new NotImplementedException(); + } + /** * Returns a {@link KeyIterator} over all keys that fall within the key-range specified by the given query and have one or more columns matching the column-range. * Calling {@link KeyIterator#getEntries()} returns the list of all entries that match the column-range specified by the given query. diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/MultiThreadsRowsCollector.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/MultiThreadsRowsCollector.java index 3e0c7f9bc4..5ddf82b313 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/MultiThreadsRowsCollector.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/MultiThreadsRowsCollector.java @@ -61,6 +61,7 @@ class MultiThreadsRowsCollector extends RowsCollector { private final StoreTransaction storeTx; private final List queries; private final Predicate keyFilter; + private final List keysToScan; private final Configuration graphConfiguration; private final DataPuller[] pullThreads; private final BlockingQueue[] dataQueues; @@ -72,6 +73,7 @@ class MultiThreadsRowsCollector extends RowsCollector { StoreTransaction storeTx, List queries, Predicate keyFilter, + List keysToScan, BlockingQueue rowQueue, Configuration graphConfiguration) throws BackendException { @@ -80,6 +82,7 @@ class MultiThreadsRowsCollector extends RowsCollector { this.storeTx = storeTx; this.queries = queries; this.keyFilter = keyFilter; + this.keysToScan = keysToScan; this.graphConfiguration = graphConfiguration; this.dataQueues = new BlockingQueue[queries.size()]; @@ -189,8 +192,14 @@ private void addDataPuller(SliceQuery sq, StoreTransaction stx, int pos) throws this.graphConfiguration.get(GraphDatabaseConfiguration.PAGE_SIZE)); dataQueues[pos] = queue; - DataPuller dp = new DataPuller(sq, queue, - KCVSUtil.getKeys(store,sq,storeFeatures, MAX_KEY_LENGTH,stx), keyFilter); + KeyIterator keyIterator; + if (keysToScan != null) { + keyIterator = store.getKeys(keysToScan, sq, stx); + } else { + keyIterator = KCVSUtil.getKeys(store, sq, storeFeatures, MAX_KEY_LENGTH, stx); + } + + DataPuller dp = new DataPuller(sq, queue, keyIterator, keyFilter); pullThreads[pos] = dp; dp.setName("data-puller-" + pos); // setting the name for thread dumps! dp.start(); diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/ScanJob.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/ScanJob.java index ed123b4352..043a138d81 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/ScanJob.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/ScanJob.java @@ -113,6 +113,14 @@ default void workerIterationEnd(ScanMetrics metrics) {} */ List getQueries(); + /** + * Get keys to scan by the job. If stream is empty, all keys will be scanned. + * @return + */ + default List getKeysToScan() { + return null; + } + /** * A predicate that determines whether * {@link #process(org.janusgraph.diskstorage.StaticBuffer, java.util.Map, ScanMetrics)} diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/StandardScannerExecutor.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/StandardScannerExecutor.java index 8efd3ed2f4..3c2d681693 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/StandardScannerExecutor.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/StandardScannerExecutor.java @@ -167,7 +167,7 @@ private RowsCollector buildScanner(BlockingQueue processorQueue, List keys, final SliceQuery query, final StoreTransaction txh) throws BackendException { + return runWithMetrics(txh, metricsStoreName, M_GET_KEYS, () -> { + final KeyIterator ki = backend.getKeys(keys, query, txh); + if (txh.getConfiguration().hasGroupName()) { + return MetricInstrumentedIterator.of(ki, txh.getConfiguration().getGroupName(), metricsStoreName, M_GET_KEYS, M_ITERATOR); + } else { + return ki; + } + }); + } + @Override public KeyIterator getKeys(final KeyRangeQuery query, final StoreTransaction txh) throws BackendException { return runWithMetrics(txh, metricsStoreName, M_GET_KEYS, () -> { diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java index 8237e3a9d0..7c51ad087a 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java @@ -830,6 +830,10 @@ public boolean apply(@Nullable String s) { "up to this many elements.", ConfigOption.Type.MASKABLE, 100); + public static final ConfigOption KEYS_SIZE = new ConfigOption<>(STORAGE_NS,"keys-size", + "The maximum amount of keys/partitions to retrieve from distributed storage system by JanusGraph in a single request.", + ConfigOption.Type.MASKABLE, 100); + public static final ConfigOption DROP_ON_CLEAR = new ConfigOption<>(STORAGE_NS, "drop-on-clear", "Whether to drop the graph database (true) or delete rows (false) when clearing storage. " + "Note that some backends always drop the graph database when clearing storage. Also note that indices are " + diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/management/ManagementSystem.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/management/ManagementSystem.java index 302071428d..7925668bf7 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/management/ManagementSystem.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/management/ManagementSystem.java @@ -914,11 +914,20 @@ public JanusGraphIndex buildMixedIndex(String backingIndex) { --------------- */ @Override public ScanJobFuture updateIndex(Index index, SchemaAction updateAction) { - return updateIndex(index, updateAction, Runtime.getRuntime().availableProcessors()); + return updateIndex(index, updateAction, null, Runtime.getRuntime().availableProcessors()); } @Override public ScanJobFuture updateIndex(Index index, SchemaAction updateAction, int numOfThreads) { + return updateIndex(index, updateAction, null, numOfThreads); + } + + @Override + public ScanJobFuture updateIndex(Index index, SchemaAction updateAction, List vertexOnly) { + return updateIndex(index, updateAction, vertexOnly, Runtime.getRuntime().availableProcessors()); + } + + private ScanJobFuture updateIndex(Index index, SchemaAction updateAction, List vertexOnly, int numOfThreads) { Preconditions.checkArgument(index != null, "Need to provide an index"); Preconditions.checkArgument(updateAction != null, "Need to provide update action"); @@ -967,7 +976,7 @@ public ScanJobFuture updateIndex(Index index, SchemaAction updateAction, int num builder.setFinishJob(indexId.getIndexJobFinisher(graph, SchemaAction.ENABLE_INDEX)); builder.setJobId(indexId); builder.setNumProcessingThreads(numOfThreads); - builder.setJob(VertexJobConverter.convert(graph, new IndexRepairJob(indexId.indexName, indexId.relationTypeName))); + builder.setJob(VertexJobConverter.convert(graph, new IndexRepairJob(indexId.indexName, indexId.relationTypeName), vertexOnly)); try { future = builder.execute(); } catch (BackendException e) { diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/AbstractScanJob.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/AbstractScanJob.java index 34f3d0d3c7..62a060593b 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/AbstractScanJob.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/AbstractScanJob.java @@ -28,7 +28,7 @@ public abstract class AbstractScanJob implements ScanJob { protected final GraphProvider graph; protected StandardJanusGraphTx tx; - private IDManager idManager; + protected IDManager idManager; public AbstractScanJob(JanusGraph graph) { this.graph = new GraphProvider(); diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/VertexJobConverter.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/VertexJobConverter.java index 4f86ac3db6..72ac0f9151 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/VertexJobConverter.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/VertexJobConverter.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.function.Predicate; +import java.util.stream.Collectors; /** * @author Matthias Broecheler (me@matthiasb.com) @@ -50,23 +51,31 @@ public class VertexJobConverter extends AbstractScanJob { protected final VertexScanJob job; + protected final List vertexIdsToScan; + protected VertexJobConverter(JanusGraph graph, VertexScanJob job) { + this(graph, job, null); + } + + protected VertexJobConverter(JanusGraph graph, VertexScanJob job, List vertexIdsToScan) { super(graph); Preconditions.checkArgument(job!=null); this.job = job; + this.vertexIdsToScan = vertexIdsToScan; } protected VertexJobConverter(VertexJobConverter copy) { super(copy); this.job = copy.job.clone(); + this.vertexIdsToScan = copy.vertexIdsToScan; } - public static ScanJob convert(JanusGraph graph, VertexScanJob vertexJob) { - return new VertexJobConverter(graph,vertexJob); + public static ScanJob convert(JanusGraph graph, VertexScanJob vertexJob, List vertexIdsToScan) { + return new VertexJobConverter(graph, vertexJob, vertexIdsToScan); } public static ScanJob convert(VertexScanJob vertexJob) { - return new VertexJobConverter(null,vertexJob); + return new VertexJobConverter(null, vertexJob, null); } @Override @@ -130,6 +139,18 @@ public List getQueries() { } } + @Override + public List getKeysToScan() { + if (this.vertexIdsToScan == null) { + return null; + } else { + return this.vertexIdsToScan + .stream() + .map(k -> idManager.getKey(k)) + .collect(Collectors.toList()); + } + } + @Override public Predicate getKeyFilter() { return buffer -> !IDManager.VertexIDType.Invisible.is(getVertexId(buffer)); diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java index b01136a176..f983205126 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java @@ -121,12 +121,12 @@ public class CQLKeyColumnValueStore implements KeyColumnValueStore { public static final Function EXCEPTION_MAPPER = cause -> { cause = CompletableFutureUtil.unwrapExecutionException(cause); - if(cause instanceof InterruptedException || cause.getCause() instanceof InterruptedException){ + if (cause instanceof InterruptedException || cause.getCause() instanceof InterruptedException) { Thread.currentThread().interrupt(); return new PermanentBackendException(cause instanceof InterruptedException ? cause : cause.getCause()); } - if(cause instanceof BackendException){ - return (BackendException) cause; + if (cause instanceof BackendException || cause.getCause() instanceof BackendException) { + return (BackendException) (cause instanceof BackendException ? cause : cause.getCause()); } return Match(cause).of( Case($(instanceOf(QueryValidationException.class)), PermanentBackendException::new), @@ -479,6 +479,17 @@ public void acquireLock(final StaticBuffer key, final StaticBuffer column, final } } + @Override + public KeyIterator getKeys(final List keys, final SliceQuery query, final StoreTransaction txh) throws BackendException { + return Try.of(() -> new CQLMapKeyIterator(new CQLSubsetIterator<>(keys, this.storeManager.getKeysSize(), (keysList) -> { + try { + return getSlice(keysList, query, txh).entrySet().iterator(); + } catch (BackendException e) { + throw new RuntimeException(e); + } + }))).getOrElseThrow(EXCEPTION_MAPPER); + } + @Override public KeyIterator getKeys(final KeyRangeQuery query, final StoreTransaction txh) throws BackendException { if (!this.storeManager.getFeatures().hasOrderedScan()) { diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLMapKeyIterator.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLMapKeyIterator.java new file mode 100644 index 0000000000..129c0f982c --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLMapKeyIterator.java @@ -0,0 +1,81 @@ +// Copyright 2024 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql; + +import org.janusgraph.diskstorage.Entry; +import org.janusgraph.diskstorage.EntryList; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.keycolumnvalue.KeyIterator; +import org.janusgraph.diskstorage.util.RecordIterator; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +public class CQLMapKeyIterator implements KeyIterator { + + private Map.Entry currentEntry = null; + + private final Iterator> entriesIterator; + + public CQLMapKeyIterator(Iterator> entriesIterator) { + this.entriesIterator = entriesIterator; + } + + @Override + public RecordIterator getEntries() { + return new EntryRecordIterator(this.currentEntry.getValue()); + } + + @Override + public void close() throws IOException { + //NOP + } + + @Override + public boolean hasNext() { + return this.entriesIterator.hasNext(); + } + + @Override + public StaticBuffer next() { + this.currentEntry = this.entriesIterator.next(); + return currentEntry.getKey(); + } + + static class EntryRecordIterator implements RecordIterator { + + private final Iterator entryIterator; + + public EntryRecordIterator(EntryList entryList) { + this.entryIterator = entryList.iterator(); + } + + @Override + public void close() throws IOException { + //NOP + } + + @Override + public boolean hasNext() { + return entryIterator.hasNext(); + } + + @Override + public Entry next() { + return entryIterator.next(); + } + } +} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLSubsetIterator.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLSubsetIterator.java new file mode 100644 index 0000000000..cadf15218a --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLSubsetIterator.java @@ -0,0 +1,69 @@ +// Copyright 2024 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql; + +import com.google.common.collect.Iterables; +import org.janusgraph.diskstorage.StaticBuffer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; + +public class CQLSubsetIterator implements Iterator { + + private Iterator itemIterator; + private final int keysSliceSize; + private final Iterable keysIter; + private final Function, Iterator> iteratorSupplier; + + public CQLSubsetIterator(List keys, int keysSliceSize, Function, Iterator> iteratorSupplier) { + this.iteratorSupplier = iteratorSupplier; + Iterator keysConverted = keys.iterator(); + this.keysIter = () -> keysConverted; + this.keysSliceSize = keysSliceSize; + this.fetchSubset(); + } + + @Override + public boolean hasNext() { + while (!this.itemIterator.hasNext()) { + boolean hasKeys = fetchSubset(); + if (!hasKeys) { + return false; + } + } + return true; + } + + @Override + public TItem next() { + return this.itemIterator.next(); + } + + private boolean fetchSubset() { + List subset = new ArrayList<>(this.keysSliceSize); + Iterables.limit(this.keysIter, this.keysSliceSize).forEach(subset::add); + + if (subset.isEmpty()) { + this.itemIterator = Collections.emptyIterator(); + return false; + } else { + this.itemIterator = iteratorSupplier.apply(subset); + return true; + } + } +} diff --git a/janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/CQLSubsetIteratorTest.java b/janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/CQLSubsetIteratorTest.java new file mode 100644 index 0000000000..f682ceaa5a --- /dev/null +++ b/janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/CQLSubsetIteratorTest.java @@ -0,0 +1,163 @@ +// Copyright 2024 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql; + +import com.google.common.collect.Lists; +import io.vavr.collection.Array; +import io.vavr.collection.Iterator; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.util.WriteByteBuffer; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.fail; + +public class CQLSubsetIteratorTest { + + @Test + public void testIterator() { + + int keysSliceSize = 2; + List inputKeys = Array.of("key1", "key2", "key3", "key4").asJava(); + List checkKeys = new ArrayList<>(4); + + final List keys = encodeKeys(inputKeys); + + CQLSubsetIterator subsetIterator = new CQLSubsetIterator<>(keys, + keysSliceSize, + (subList) -> { + assertEquals(subList.size(), 2); + subList.stream().map(this::decodeKey).forEach(checkKeys::add); + return Array.of(10, 20).iterator(); + }); + + assertTrue(subsetIterator.hasNext()); + assertEquals(Array.of(10, 20, 10, 20).asJava(), Lists.newArrayList(subsetIterator)); + assertFalse(subsetIterator.hasNext()); + assertEquals(inputKeys, checkKeys); + } + + @Test + public void testIteratorNotEven() { + + int keysSliceSize = 2; + List inputKeys = Array.of("key1", "key2", "key3").asJava(); + List checkKeys = new ArrayList<>(3); + + final List keys = encodeKeys(inputKeys); + + CQLSubsetIterator subsetIterator = new CQLSubsetIterator<>(keys, + keysSliceSize, + (subList) -> { + subList.stream().map(this::decodeKey).forEach(checkKeys::add); + return Array.of(10, 20).iterator(); + }); + + assertTrue(subsetIterator.hasNext()); + assertEquals(Array.of(10, 20, 10, 20).asJava(), Lists.newArrayList(subsetIterator)); + assertFalse(subsetIterator.hasNext()); + assertEquals(inputKeys, checkKeys); + } + + @Test + public void testPartialEmptyResults() { + + int keysSliceSize = 1; + List inputKeys = Array.of("key1", "key2", "key3", "key4").asJava(); + List checkKeys = new ArrayList<>(3); + + final List keys = encodeKeys(inputKeys); + + CQLSubsetIterator subsetIterator = new CQLSubsetIterator<>(keys, + keysSliceSize, + (subList) -> { + assertEquals(subList.size(), 1); + String key = this.decodeKey(subList.get(0)); + checkKeys.add(key); + if (!key.equals("key3")) { + return Iterator.empty(); + } else { + return Array.of(10, 20).iterator(); + } + }); + + assertTrue(subsetIterator.hasNext()); + assertEquals(Array.of(10, 20).asJava(), Lists.newArrayList(subsetIterator)); + assertFalse(subsetIterator.hasNext()); + assertEquals(inputKeys, checkKeys); + } + + @Test + public void testEmptyResults() { + + int keysSliceSize = 1; + List inputKeys = Array.of("key1", "key2", "key3").asJava(); + List checkKeys = new ArrayList<>(3); + + final List keys = encodeKeys(inputKeys); + + CQLSubsetIterator subsetIterator = new CQLSubsetIterator<>(keys, + keysSliceSize, + (subList) -> { + assertEquals(subList.size(), 1); + String key = this.decodeKey(subList.get(0)); + checkKeys.add(key); + return Iterator.empty(); + + }); + + assertFalse(subsetIterator.hasNext()); + assertEquals(inputKeys, checkKeys); + } + + @Test + public void testEmptyKeys() { + + int keysSliceSize = 2; + final List keys = Collections.emptyList(); + + CQLSubsetIterator subsetIterator = new CQLSubsetIterator<>(keys, + keysSliceSize, + (subList) -> { + fail("Iterator should have never been called"); + return Array.of(1, 2, 3, 4, 5, 6).iterator(); + }); + + assertFalse(subsetIterator.hasNext()); + } + + private List encodeKeys(List keys) { + return keys.stream() + .map(keyStr -> { + byte[] bytes = keyStr.getBytes(); + WriteByteBuffer bb = new WriteByteBuffer(bytes.length); + bb.putBytes(bytes); + return bb.getStaticBuffer(); + }) + .collect(Collectors.toList()); + } + + private String decodeKey(StaticBuffer key) { + return StandardCharsets.UTF_8.decode(key.asByteBuffer()).toString(); + } +} diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/BerkeleyElasticsearchTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/BerkeleyElasticsearchTest.java index 45dca41fdb..cdf8c8ba11 100644 --- a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/BerkeleyElasticsearchTest.java +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/BerkeleyElasticsearchTest.java @@ -75,4 +75,9 @@ public void testDisableAndDiscardManuallyAndDropEnabledIndex() throws Exception public void testDiscardAndDropRegisteredIndex() throws ExecutionException, InterruptedException { super.testDiscardAndDropRegisteredIndex(); } + + @Test + public void testSubsetReindex() { + //Not supported + } } diff --git a/janusgraph-hadoop/src/main/java/org/janusgraph/hadoop/scan/HadoopVertexScanMapper.java b/janusgraph-hadoop/src/main/java/org/janusgraph/hadoop/scan/HadoopVertexScanMapper.java index 6282f2dea6..76970bb57a 100644 --- a/janusgraph-hadoop/src/main/java/org/janusgraph/hadoop/scan/HadoopVertexScanMapper.java +++ b/janusgraph-hadoop/src/main/java/org/janusgraph/hadoop/scan/HadoopVertexScanMapper.java @@ -36,7 +36,7 @@ protected void setup(Context context) throws IOException, InterruptedException { VertexScanJob vertexScan = getVertexScanJob(scanConf); ModifiableConfiguration graphConf = getJanusGraphConfiguration(context); graph = JanusGraphFactory.open(graphConf); - job = VertexJobConverter.convert(graph, vertexScan); + job = VertexJobConverter.convert(graph, vertexScan, null); metrics = new HadoopContextScanMetrics(context); finishSetup(scanConf, graphConf); } diff --git a/janusgraph-lucene/src/test/java/org/janusgraph/diskstorage/lucene/BerkeleyLuceneTest.java b/janusgraph-lucene/src/test/java/org/janusgraph/diskstorage/lucene/BerkeleyLuceneTest.java index 332e80ccaa..20b8592545 100644 --- a/janusgraph-lucene/src/test/java/org/janusgraph/diskstorage/lucene/BerkeleyLuceneTest.java +++ b/janusgraph-lucene/src/test/java/org/janusgraph/diskstorage/lucene/BerkeleyLuceneTest.java @@ -156,4 +156,8 @@ public void testCreateMixedIndexThatPreviouslyExisted() { assertThrows(UnsupportedOperationException.class, super::testCreateMixedIndexThatPreviouslyExisted); } + @Test + public void testSubsetReindex() { + //Not supported + } }