From 760dbff926cd594228b7072476e47a7ac3dc1e8e Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 26 Nov 2024 23:34:28 +0000 Subject: [PATCH] Makes seeding a fate transaction more efficient Modified fate to seed fate transaction in single conditional mutation instead of multiple. fixes #5097 --- .../accumulo/core/fate/AbstractFateStore.java | 11 + .../org/apache/accumulo/core/fate/Fate.java | 47 +--- .../apache/accumulo/core/fate/FateStore.java | 23 +- .../accumulo/core/fate/user/FateMutator.java | 23 ++ .../core/fate/user/FateMutatorImpl.java | 38 ++- .../core/fate/user/RowExistsIterator.java | 46 ++++ .../core/fate/user/UserFateStore.java | 132 +++------- .../core/fate/zookeeper/MetaFateStore.java | 54 +++- .../accumulo/core/logging/FateLogger.java | 43 +-- .../apache/accumulo/core/fate/TestStore.java | 11 +- .../iterators/SetEncodingIterator.java | 2 +- .../compaction/ExternalCompaction_1_IT.java | 24 +- .../org/apache/accumulo/test/fate/FateIT.java | 4 + .../accumulo/test/fate/FateStoreIT.java | 244 +++++++++++++++--- 14 files changed, 493 insertions(+), 209 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/fate/user/RowExistsIterator.java diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index ff5e45d3103..3bc322c3c21 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -69,6 +69,11 @@ public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey) { UUID txUUID = UUID.nameUUIDFromBytes(fateKey.getSerialized()); return FateId.from(instanceType, txUUID); } + + @Override + public FateId newRandomId(FateInstanceType instanceType) { + return FateId.from(instanceType, UUID.randomUUID()); + } }; // The ZooKeeper lock for the process that's running this store instance @@ -402,6 +407,12 @@ public FateId getID() { public interface FateIdGenerator { FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey); + + FateId newRandomId(FateInstanceType instanceType); + } + + protected void seededTx() { + unreservedRunnableCount.increment(); } protected byte[] serializeTxInfo(Serializable so) { diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 1350cce6524..8a368ce0669 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -62,8 +62,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - /** * Fault tolerant executor */ @@ -403,55 +401,14 @@ public FateId startTransaction() { public Optional seedTransaction(String txName, FateKey fateKey, Repo repo, boolean autoCleanUp, String goalMessage) { - - Optional> optTxStore = store.createAndReserve(fateKey); - - return optTxStore.map(txStore -> { - var fateId = txStore.getID(); - try { - Preconditions.checkState(txStore.getStatus() == NEW); - seedTransaction(txName, fateId, repo, autoCleanUp, goalMessage, txStore); - } finally { - txStore.unreserve(Duration.ZERO); - } - return fateId; - }); - } - - private void seedTransaction(String txName, FateId fateId, Repo repo, boolean autoCleanUp, - String goalMessage, FateTxStore txStore) { - if (txStore.top() == null) { - try { - log.info("Seeding {} {}", fateId, goalMessage); - txStore.push(repo); - } catch (StackOverflowException e) { - // this should not happen - throw new IllegalStateException(e); - } - } - - if (autoCleanUp) { - txStore.setTransactionInfo(TxInfo.AUTO_CLEAN, autoCleanUp); - } - - txStore.setTransactionInfo(TxInfo.TX_NAME, txName); - - txStore.setStatus(SUBMITTED); + return store.seedTransaction(txName, fateKey, repo, autoCleanUp); } // start work in the transaction.. it is safe to call this // multiple times for a transaction... but it will only seed once public void seedTransaction(String txName, FateId fateId, Repo repo, boolean autoCleanUp, String goalMessage) { - FateTxStore txStore = store.reserve(fateId); - try { - if (txStore.getStatus() == NEW) { - seedTransaction(txName, fateId, repo, autoCleanUp, goalMessage, txStore); - } - } finally { - txStore.unreserve(Duration.ZERO); - } - + store.seedTransaction(txName, fateId, repo, autoCleanUp); } // check on the transaction diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index 09ee12dd94e..88e9230db1e 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -50,19 +50,22 @@ public interface FateStore extends ReadOnlyFateStore { FateId create(); /** - * Creates and reserves a transaction using the given key. If something is already running for the - * given key, then Optional.empty() will be returned. When this returns a non-empty id, it will be - * in the new state. + * Seeds a transaction with the given repo if it does not exists. Will set the status to + * SUBMITTED. Will set the fate key. Will set autoCleanup if true. Will set the creation time. * - *

- * In the case where a process dies in the middle of a call to this. If later, another call is - * made with the same key and its in the new state then the FateId for that key will be returned. - *

+ * @return optional w/ the fate id set if seeded and empty optional otherwise + */ + Optional seedTransaction(String txName, FateKey fateKey, Repo repo, + boolean autoCleanUp); + + /** + * Seeds a transaction with the given repo if its current status is NEW and it is currently + * unreserved. Will set the status to SUBMITTED. Will set autoCleanup if true. Will set the + * creation time. * - * @throws IllegalStateException when there is an unexpected collision. This can occur if two key - * hash to the same FateId or if a random FateId already exists. + * @return true if seeded and false otherwise */ - Optional> createAndReserve(FateKey fateKey); + boolean seedTransaction(String txName, FateId fateId, Repo repo, boolean autoCleanUp); /** * An interface that allows read/write access to the data related to a single fate operation. diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java index d199a7463e4..1ba9fcd36c8 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java @@ -33,6 +33,29 @@ public interface FateMutator { FateMutator putCreateTime(long ctime); + /** + * Requires that nothing exists for this fate mutation. + */ + FateMutator requireAbsent(); + + /** + * Require that the transaction status is one of the given statuses. If no statuses are provided, + * require that the status column is absent. + * + * @param statuses The statuses to check against. + */ + FateMutator requireStatus(TStatus... statuses); + + /** + * Require the transaction has not reservation. + */ + FateMutator requireUnreserved(); + + /** + * Require the transaction has no fate key set. + */ + FateMutator requireAbsentKey(); + /** * Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will * put the reservation if there is not already a reservation present diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java index 5d99a8df3a3..ea7dd85c571 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java @@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; @@ -48,12 +49,16 @@ import org.apache.accumulo.core.security.Authorizations; import org.apache.hadoop.io.Text; +import com.google.common.base.Preconditions; + public class FateMutatorImpl implements FateMutator { private final ClientContext context; private final String tableName; private final FateId fateId; private final ConditionalMutation mutation; + private boolean requiredUnreserved = false; + public static final int INITIAL_ITERATOR_PRIO = 1000000; public FateMutatorImpl(ClientContext context, String tableName, FateId fateId) { this.context = Objects.requireNonNull(context); @@ -81,10 +86,34 @@ public FateMutator putCreateTime(long ctime) { } @Override - public FateMutator putReservedTx(FateStore.FateReservation reservation) { + public FateMutator requireAbsent() { + IteratorSetting is = new IteratorSetting(INITIAL_ITERATOR_PRIO, RowExistsIterator.class); + Condition c = new Condition("", "").setIterators(is); + mutation.addCondition(c); + return this; + } + + @Override + public FateMutator requireUnreserved() { + Preconditions.checkState(!requiredUnreserved); Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); mutation.addCondition(condition); + requiredUnreserved = true; + return this; + } + + @Override + public FateMutator requireAbsentKey() { + Condition condition = new Condition(TxColumnFamily.TX_KEY_COLUMN.getColumnFamily(), + TxColumnFamily.TX_KEY_COLUMN.getColumnQualifier()); + mutation.addCondition(condition); + return this; + } + + @Override + public FateMutator putReservedTx(FateStore.FateReservation reservation) { + requireUnreserved(); TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(reservation.getSerialized())); return this; } @@ -179,12 +208,7 @@ public FateMutator delete() { return this; } - /** - * Require that the transaction status is one of the given statuses. If no statuses are provided, - * require that the status column is absent. - * - * @param statuses The statuses to check against. - */ + @Override public FateMutator requireStatus(TStatus... statuses) { Condition condition = StatusMappingIterator.createCondition(statuses); mutation.addCondition(condition); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/RowExistsIterator.java b/core/src/main/java/org/apache/accumulo/core/fate/user/RowExistsIterator.java new file mode 100644 index 00000000000..6095546b0da --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/RowExistsIterator.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate.user; + +import java.io.IOException; +import java.util.Collection; +import java.util.Set; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.iterators.WrappingIterator; + +import com.google.common.base.Preconditions; + +/** + * Iterator is used by conditional mutations to check if row exists. + */ +public class RowExistsIterator extends WrappingIterator { + + @Override + public void seek(Range range, Collection columnFamilies, boolean inclusive) + throws IOException { + Preconditions.checkState(range.getStartKey() != null && range.getEndKey() != null); + var startRow = range.getStartKey().getRow(); + var endRow = range.getEndKey().getRow(); + Preconditions.checkState(startRow.equals(endRow)); + Range r = new Range(startRow); + super.seek(r, Set.of(), false); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index efd0cbc62f4..e6542e8af8a 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.io.Serializable; -import java.time.Duration; import java.util.EnumSet; import java.util.List; import java.util.Map.Entry; @@ -30,6 +29,7 @@ import java.util.UUID; import java.util.function.Function; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -106,7 +106,7 @@ public FateId create() { UtilWaitThread.sleep(100); } - var status = newMutator(fateId).requireStatus().putStatus(TStatus.NEW) + var status = newMutator(fateId).requireAbsent().putStatus(TStatus.NEW) .putCreateTime(System.currentTimeMillis()).tryMutate(); switch (status) { @@ -123,104 +123,54 @@ public FateId create() { } public FateId getFateId() { - return FateId.from(fateInstanceType, UUID.randomUUID()); + return fateIdGenerator.newRandomId(type()); } @Override - public Optional> createAndReserve(FateKey fateKey) { - final var reservation = FateReservation.from(lockID, UUID.randomUUID()); + public Optional seedTransaction(String txName, FateKey fateKey, Repo repo, + boolean autoCleanUp) { final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey); - Optional> txStore = Optional.empty(); - int maxAttempts = 5; - FateMutator.Status status = null; - - // Only need to retry if it is UNKNOWN - for (int attempt = 0; attempt < maxAttempts; attempt++) { - status = newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey) - .putReservedTx(reservation).putCreateTime(System.currentTimeMillis()).tryMutate(); - if (status != FateMutator.Status.UNKNOWN) { - break; - } - UtilWaitThread.sleep(100); + Supplier> mutatorFactory = () -> newMutator(fateId).requireAbsent() + .putKey(fateKey).putCreateTime(System.currentTimeMillis()); + if (seedTransaction(mutatorFactory, fateKey + " " + fateId, txName, repo, autoCleanUp)) { + return Optional.of(fateId); + } else { + return Optional.empty(); } + } - switch (status) { - case ACCEPTED: - txStore = Optional.of(new FateTxStoreImpl(fateId, reservation)); - break; - case REJECTED: - // If the status is REJECTED, we need to check what about the mutation was REJECTED: - // 1) Possible something like the following occurred: - // the first attempt was UNKNOWN but written, the next attempt would be rejected - // We return the FateTxStore in this case. - // 2) If there is a collision with existing fate id, throw error - // 3) If the fate id is already reserved, return an empty optional - // 4) If the fate id is still NEW/unseeded and unreserved, we can try to reserve it - try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { - scanner.setRange(getRow(fateId)); - scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(), - TxColumnFamily.STATUS_COLUMN.getColumnQualifier()); - scanner.fetchColumn(TxColumnFamily.TX_KEY_COLUMN.getColumnFamily(), - TxColumnFamily.TX_KEY_COLUMN.getColumnQualifier()); - scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), - TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); - TStatus statusSeen = TStatus.UNKNOWN; - Optional fateKeySeen = Optional.empty(); - Optional reservationSeen = Optional.empty(); - - for (Entry entry : scanner) { - Text colf = entry.getKey().getColumnFamily(); - Text colq = entry.getKey().getColumnQualifier(); - Value val = entry.getValue(); - - switch (colq.toString()) { - case TxColumnFamily.STATUS: - statusSeen = TStatus.valueOf(val.toString()); - break; - case TxColumnFamily.TX_KEY: - fateKeySeen = Optional.of(FateKey.deserialize(val.get())); - break; - case TxColumnFamily.RESERVATION: - reservationSeen = Optional.of(FateReservation.deserialize(val.get())); - break; - default: - throw new IllegalStateException("Unexpected column seen: " + colf + ":" + colq); - } - } + @Override + public boolean seedTransaction(String txName, FateId fateId, Repo repo, boolean autoCleanUp) { + Supplier> mutatorFactory = + () -> newMutator(fateId).requireStatus(TStatus.NEW).requireUnreserved().requireAbsentKey(); + return seedTransaction(mutatorFactory, fateId.canonical(), txName, repo, autoCleanUp); + } - if (statusSeen == TStatus.NEW) { - verifyFateKey(fateId, fateKeySeen, fateKey); - // This will be the case if the mutation status is REJECTED but the mutation was written - if (reservationSeen.isPresent() && reservationSeen.orElseThrow().equals(reservation)) { - txStore = Optional.of(new FateTxStoreImpl(fateId, reservation)); - } else if (reservationSeen.isEmpty()) { - // NEW/unseeded transaction and not reserved, so we can allow it to be reserved - // we tryReserve() since another thread may have reserved it since the scan - txStore = tryReserve(fateId); - // the status was known before reserving to be NEW, - // however it could change so check after reserving to avoid race conditions. - var statusAfterReserve = - txStore.map(ReadOnlyFateTxStore::getStatus).orElse(TStatus.UNKNOWN); - if (statusAfterReserve != TStatus.NEW) { - txStore.ifPresent(txs -> txs.unreserve(Duration.ZERO)); - txStore = Optional.empty(); - } - } - } else { - log.trace( - "fate id {} tstatus {} fate key {} is reserved {} " - + "has already been seeded with work (non-NEW status)", - fateId, statusSeen, fateKeySeen.orElse(null), reservationSeen.isPresent()); - } - } catch (TableNotFoundException e) { - throw new IllegalStateException(tableName + " not found!", e); - } - break; - default: - throw new IllegalStateException("Unknown or unexpected status " + status); + private boolean seedTransaction(Supplier> mutatorFactory, String logId, + String txName, Repo repo, boolean autoCleanUp) { + int maxAttempts = 5; + for (int attempt = 0; attempt < maxAttempts; attempt++) { + var mutator = mutatorFactory.get(); + mutator = + mutator.putName(serializeTxInfo(txName)).putRepo(1, repo).putStatus(TStatus.SUBMITTED); + if (autoCleanUp) { + mutator = mutator.putAutoClean(serializeTxInfo(autoCleanUp)); + } + var status = mutator.tryMutate(); + if (status == FateMutator.Status.ACCEPTED) { + // signal to the super class that a new fate transaction was seeded and is ready to run + seededTx(); + return true; + } else if (status == FateMutator.Status.REJECTED) { + return false; + } else if (status == FateMutator.Status.UNKNOWN) { + log.debug("Attempt to seed {} returned {} status, retrying", logId, status); + UtilWaitThread.sleep(250); + } } - return txStore; + log.warn("Repeatedly received unknown status when attempting to seed {}", logId); + return false; } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java index d6da05e844f..33b9fafd65a 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java @@ -20,12 +20,15 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.Serializable; import java.io.UncheckedIOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; @@ -100,7 +103,7 @@ public MetaFateStore(String path, ZooReaderWriter zk, ZooUtil.LockID lockID, public FateId create() { while (true) { try { - FateId fateId = FateId.from(fateInstanceType, UUID.randomUUID()); + FateId fateId = fateIdGenerator.newRandomId(fateInstanceType); zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, null).serialize(), NodeExistsPolicy.FAIL); return fateId; @@ -112,8 +115,7 @@ public FateId create() { } } - @Override - public Optional> createAndReserve(FateKey fateKey) { + private Optional> createAndReserve(FateKey fateKey) { final var reservation = FateReservation.from(lockID, UUID.randomUUID()); final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey); @@ -161,6 +163,52 @@ public Optional> createAndReserve(FateKey fateKey) { } } + @Override + public Optional seedTransaction(String txName, FateKey fateKey, Repo repo, + boolean autoCleanUp) { + return createAndReserve(fateKey).map(txStore -> { + try { + seedTransaction(txName, repo, autoCleanUp, txStore); + return txStore.getID(); + } finally { + txStore.unreserve(Duration.ZERO); + } + }); + } + + @Override + public boolean seedTransaction(String txName, FateId fateId, Repo repo, boolean autoCleanUp) { + return tryReserve(fateId).map(txStore -> { + try { + if (txStore.getStatus() == NEW) { + seedTransaction(txName, repo, autoCleanUp, txStore); + return true; + } + return false; + } finally { + txStore.unreserve(Duration.ZERO); + } + }).orElseThrow(); + } + + protected void seedTransaction(String txName, Repo repo, boolean autoCleanUp, + FateTxStore txStore) { + if (txStore.top() == null) { + try { + txStore.push(repo); + } catch (StackOverflowException e) { + // this should not happen + throw new IllegalStateException(e); + } + } + + if (autoCleanUp) { + txStore.setTransactionInfo(TxInfo.AUTO_CLEAN, autoCleanUp); + } + txStore.setTransactionInfo(TxInfo.TX_NAME, txName); + txStore.setStatus(SUBMITTED); + } + @Override public Optional> tryReserve(FateId fateId) { // uniquely identify this attempt to reserve the fate operation data diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index 4a9f2517c01..3fb42014aff 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -149,6 +149,33 @@ public FateId create() { return fateId; } + @Override + public Optional seedTransaction(String txName, FateKey fateKey, Repo repo, + boolean autoCleanUp) { + var optional = store.seedTransaction(txName, fateKey, repo, autoCleanUp); + if (storeLog.isTraceEnabled()) { + optional.ifPresentOrElse(fateId -> { + storeLog.trace("{} seeded {} {} {}", fateId, fateKey, toLogString.apply(repo), + autoCleanUp); + }, () -> { + storeLog.trace("Unable to seed {} {} {}", fateKey, toLogString.apply(repo), + autoCleanUp); + }); + } + return optional; + } + + @Override + public boolean seedTransaction(String txName, FateId fateId, Repo repo, + boolean autoCleanUp) { + boolean seeded = store.seedTransaction(txName, fateId, repo, autoCleanUp); + if (storeLog.isTraceEnabled()) { + storeLog.trace("{} {} {} {}", fateId, seeded ? "seeded" : "unable to seed", + toLogString.apply(repo), autoCleanUp); + } + return seeded; + } + @Override public int getDeferredCount() { return store.getDeferredCount(); @@ -164,22 +191,6 @@ public boolean isDeferredOverflow() { return store.isDeferredOverflow(); } - @Override - public Optional> createAndReserve(FateKey fateKey) { - Optional> txStore = store.createAndReserve(fateKey); - if (storeLog.isTraceEnabled()) { - if (txStore.isPresent()) { - storeLog.trace("{} created and reserved fate transaction using key : {}", - txStore.orElseThrow().getID(), fateKey); - } else { - storeLog.trace( - "fate transaction was not created using key : {}, existing transaction exists", - fateKey); - } - } - return txStore; - } - @Override public Map getActiveReservations() { return store.getActiveReservations(); diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index 2c54464663f..40d0d755b13 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -53,8 +53,15 @@ public FateId create() { } @Override - public Optional> createAndReserve(FateKey key) { - throw new UnsupportedOperationException(); + public Optional seedTransaction(String txName, FateKey fateKey, Repo repo, + boolean autoCleanUp) { + return Optional.empty(); + } + + @Override + public boolean seedTransaction(String txName, FateId fateId, Repo repo, + boolean autoCleanUp) { + return false; } @Override diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java index b0456afd32a..ebe732049f1 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java @@ -79,7 +79,7 @@ static Text getTabletRow(Range range) { // expecting this range to cover a single metadata row, so validate the range meets expectations MetadataSchema.TabletsSection.validateRow(row); Preconditions.checkArgument(row.equals(range.getEndKey().getRow())); - return range.getStartKey().getRow(); + return row; } @Override diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index e8955e465ab..c8905cd850a 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -78,6 +78,7 @@ import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.user.UserFateStore; import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.iterators.DevNull; @@ -96,6 +97,7 @@ import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.server.util.FindCompactionTmpFiles; @@ -332,6 +334,21 @@ public void testCompactionCommitAndDeadDetectionAll() throws Exception { } } + public static class FakeRepo extends ManagerRepo { + + private static final long serialVersionUID = 1234L; + + @Override + public long isReady(FateId fateId, Manager environment) throws Exception { + return 1000; + } + + @Override + public Repo call(FateId fateId, Manager environment) throws Exception { + return null; + } + } + private FateId createCompactionCommitAndDeadMetadata(AccumuloClient c, FateStore fateStore, String tableName, Map> allCids) throws Exception { @@ -345,10 +362,9 @@ private FateId createCompactionCommitAndDeadMetadata(AccumuloClient c, // Create a fate transaction for one of the compaction ids that is in the new state, it // should never run. Its purpose is to prevent the dead compaction detector // from deleting the id. - FateStore.FateTxStore fateTx = fateStore - .createAndReserve(FateKey.forCompactionCommit(allCids.get(tableId).get(0))).orElseThrow(); - var fateId = fateTx.getID(); - fateTx.unreserve(Duration.ZERO); + Repo repo = new FakeRepo(); + var fateId = fateStore.seedTransaction("COMPACTION_COMMIT", + FateKey.forCompactionCommit(allCids.get(tableId).get(0)), repo, true).orElseThrow(); // Read the tablet metadata var tabletsMeta = ctx.getAmple().readTablets().forTable(tableId).build().stream() diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java index d36e98bdecb..13e9e23abfb 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java @@ -74,6 +74,10 @@ public static class TestRepo implements Repo { private final String data; + public TestRepo() { + this("test"); + } + public TestRepo(String data) { this.data = data; } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java index 64607cab7b6..f389564dc53 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java @@ -25,10 +25,10 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.time.Duration; +import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -40,6 +40,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.accumulo.core.data.TableId; @@ -47,6 +48,7 @@ import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.Fate.TxInfo; import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.FateStore.FateTxStore; @@ -296,18 +298,20 @@ protected void testCreateWithKey(FateStore store, ServerContext sctx) { FateKey fateKey2 = FateKey.forCompactionCommit(ExternalCompactionId.generate(UUID.randomUUID())); - FateTxStore txStore1 = store.createAndReserve(fateKey1).orElseThrow(); - FateTxStore txStore2 = store.createAndReserve(fateKey2).orElseThrow(); + var fateId1 = store.seedTransaction("TEST", fateKey1, new TestRepo(), true).orElseThrow(); + var fateId2 = store.seedTransaction("TEST", fateKey2, new TestRepo(), true).orElseThrow(); - assertNotEquals(txStore1.getID(), txStore2.getID()); + assertNotEquals(fateId1, fateId2); + var txStore1 = store.reserve(fateId1); + var txStore2 = store.reserve(fateId2); try { assertTrue(txStore1.timeCreated() > 0); - assertEquals(TStatus.NEW, txStore1.getStatus()); + assertEquals(TStatus.SUBMITTED, txStore1.getStatus()); assertEquals(fateKey1, txStore1.getKey().orElseThrow()); assertTrue(txStore2.timeCreated() > 0); - assertEquals(TStatus.NEW, txStore2.getStatus()); + assertEquals(TStatus.SUBMITTED, txStore2.getStatus()); assertEquals(fateKey2, txStore2.getKey().orElseThrow()); assertEquals(2, store.list().count()); @@ -332,14 +336,16 @@ protected void testCreateWithKeyDuplicate(FateStore store, ServerContex // A second call to createAndReserve() should just return an empty optional // since it's already in reserved and in progress FateKey fateKey = FateKey.forSplit(ke); - FateTxStore txStore = store.createAndReserve(fateKey).orElseThrow(); + var fateId = store.seedTransaction("TEST", fateKey, new TestRepo(), true).orElseThrow(); // second call is empty - assertTrue(store.createAndReserve(fateKey).isEmpty()); + assertTrue(store.seedTransaction("TEST", fateKey, new TestRepo(), true).isEmpty()); + assertFalse(store.seedTransaction("TEST", fateId, new TestRepo(), true)); + var txStore = store.reserve(fateId); try { assertTrue(txStore.timeCreated() > 0); - assertEquals(TStatus.NEW, txStore.getStatus()); + assertEquals(TStatus.SUBMITTED, txStore.getStatus()); assertEquals(fateKey, txStore.getKey().orElseThrow()); assertEquals(1, store.list().count()); } finally { @@ -359,15 +365,16 @@ protected void testCreateWithKeyInProgress(FateStore store, ServerConte new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new Text("aaa")); FateKey fateKey = FateKey.forSplit(ke); - FateTxStore txStore = store.createAndReserve(fateKey).orElseThrow(); + var fateId = store.seedTransaction("TEST", fateKey, new TestRepo(), true).orElseThrow(); + var txStore = store.reserve(fateId); try { assertTrue(txStore.timeCreated() > 0); txStore.setStatus(TStatus.IN_PROGRESS); // We have an existing transaction with the same key in progress // so should return an empty Optional - assertTrue(store.createAndReserve(fateKey).isEmpty()); + assertTrue(store.seedTransaction("TEST", fateKey, new TestRepo(), true).isEmpty()); assertEquals(TStatus.IN_PROGRESS, txStore.getStatus()); } finally { txStore.setStatus(TStatus.SUCCESSFUL); @@ -375,14 +382,20 @@ protected void testCreateWithKeyInProgress(FateStore store, ServerConte txStore.unreserve(Duration.ZERO); } + txStore = null; + try { // After deletion, make sure we can create again with the same key - txStore = store.createAndReserve(fateKey).orElseThrow(); + var fateId2 = store.seedTransaction("TEST", fateKey, new TestRepo(), true).orElseThrow(); + txStore = store.reserve(fateId); + assertEquals(fateId, fateId2); assertTrue(txStore.timeCreated() > 0); - assertEquals(TStatus.NEW, txStore.getStatus()); + assertEquals(TStatus.SUBMITTED, txStore.getStatus()); } finally { - txStore.delete(); - txStore.unreserve(Duration.ZERO); + if (txStore != null) { + txStore.delete(); + txStore.unreserve(Duration.ZERO); + } } } @@ -392,8 +405,18 @@ public void testCreateWithKeyCollision() throws Exception { // Replace the default hashing algorithm with one that always returns the same tid so // we can check duplicate detection with different keys executeTest(this::testCreateWithKeyCollision, AbstractFateStore.DEFAULT_MAX_DEFERRED, - (instanceType, fateKey) -> FateId.from(instanceType, - UUID.nameUUIDFromBytes("testing uuid".getBytes(UTF_8)))); + new AbstractFateStore.FateIdGenerator() { + @Override + public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey) { + return FateId.from(instanceType, + UUID.nameUUIDFromBytes("testing uuid".getBytes(UTF_8))); + } + + @Override + public FateId newRandomId(FateInstanceType instanceType) { + return FateId.from(instanceType, UUID.randomUUID()); + } + }); } protected void testCreateWithKeyCollision(FateStore store, ServerContext sctx) { @@ -404,13 +427,10 @@ protected void testCreateWithKeyCollision(FateStore store, ServerContex FateKey fateKey1 = FateKey.forSplit(ke1); FateKey fateKey2 = FateKey.forSplit(ke2); - FateTxStore txStore = store.createAndReserve(fateKey1).orElseThrow(); + var fateId1 = store.seedTransaction("TEST", fateKey1, new TestRepo(), true).orElseThrow(); + var txStore = store.reserve(fateId1); try { - var e = assertThrows(IllegalStateException.class, () -> store.createAndReserve(fateKey2)); - assertEquals( - "Collision detected for fate id " - + FateId.from(store.type(), UUID.nameUUIDFromBytes("testing uuid".getBytes(UTF_8))), - e.getMessage()); + assertTrue(store.seedTransaction("TEST", fateKey2, new TestRepo(), true).isEmpty()); assertEquals(fateKey1, txStore.getKey().orElseThrow()); } finally { txStore.delete(); @@ -430,26 +450,191 @@ protected void testCollisionWithRandomFateId(FateStore store, ServerCon new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new Text("aaa")); FateKey fateKey = FateKey.forSplit(ke); - FateTxStore txStore = store.createAndReserve(fateKey).orElseThrow(); - FateId fateId = txStore.getID(); + var fateId = store.seedTransaction("TEST", fateKey, new TestRepo(), true).orElseThrow(); // After createAndReserve a fate transaction using a key we can simulate a collision with // a random FateId by deleting the key out of Fate and calling createAndReserve again to // verify it detects the key is missing. Then we can continue and see if we can still use // the existing transaction. deleteKey(fateId, sctx); - var e = assertThrows(IllegalStateException.class, () -> store.createAndReserve(fateKey)); - assertEquals("fate key is missing from fate id " + fateId, e.getMessage()); + assertTrue(store.seedTransaction("TEST", fateKey, new TestRepo(), true).isEmpty()); + var txStore = store.reserve(fateId); // We should still be able to use the existing transaction try { assertTrue(txStore.timeCreated() > 0); + assertEquals(TStatus.SUBMITTED, txStore.getStatus()); + } finally { + txStore.delete(); + txStore.unreserve(Duration.ZERO); + } + } + + public static final UUID DUPLICATE_UUID = UUID.randomUUID(); + + public static final List UUIDS = List.of(DUPLICATE_UUID, DUPLICATE_UUID, UUID.randomUUID()); + + @Test + public void testCreate() throws Exception { + AtomicInteger index = new AtomicInteger(0); + executeTest(this::testCreate, AbstractFateStore.DEFAULT_MAX_DEFERRED, + new AbstractFateStore.FateIdGenerator() { + @Override + public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey) { + return FateId.from(instanceType, + UUID.nameUUIDFromBytes("testing uuid".getBytes(UTF_8))); + } + + @Override + public FateId newRandomId(FateInstanceType instanceType) { + return FateId.from(instanceType, UUIDS.get(index.getAndIncrement() % UUIDS.size())); + } + }); + } + + protected void testCreate(FateStore store, ServerContext sctx) throws Exception { + + var fateId1 = store.create(); + assertEquals(UUIDS.get(0), fateId1.getTxUUID()); + + // This UUIDS[1] should collide with UUIDS[0] and then the code should retry and end up UUIDS[2] + var fateId2 = store.create(); + assertEquals(UUIDS.get(2), fateId2.getTxUUID()); + + for (var fateId : List.of(fateId1, fateId2)) { + var txStore = store.reserve(fateId); + try { + assertEquals(TStatus.NEW, txStore.getStatus()); + assertTrue(txStore.timeCreated() > 0); + assertNull(txStore.top()); + assertTrue(txStore.getKey().isEmpty()); + assertEquals(fateId, txStore.getID()); + assertTrue(txStore.getStack().isEmpty()); + } finally { + txStore.unreserve(Duration.ZERO); + } + } + + assertEquals(Set.of(fateId1, fateId2), + store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet())); + + var txStore = store.reserve(fateId2); + try { + txStore.delete(); + } finally { + txStore.unreserve(Duration.ZERO); + } + + assertEquals(Set.of(fateId1), + store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet())); + + txStore = store.reserve(fateId1); + try { + txStore.setStatus(TStatus.SUBMITTED); + txStore.setStatus(TStatus.IN_PROGRESS); + txStore.push(new TestRepo()); + } finally { + txStore.unreserve(Duration.ZERO); + } + + assertEquals(Set.of(fateId1), + store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet())); + + // should collide again with the first fate id and go to the second + fateId2 = store.create(); + assertEquals(UUIDS.get(2), fateId2.getTxUUID()); + + assertEquals(Set.of(fateId1, fateId2), + store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet())); + + // ensure fateId1 was not altered in anyway by creating fateid2 when it collided + txStore = store.reserve(fateId1); + try { + assertEquals(TStatus.IN_PROGRESS, txStore.getStatus()); + txStore.forceDelete(); + } finally { + txStore.unreserve(Duration.ZERO); + } + + assertEquals(Set.of(fateId2), + store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet())); + + txStore = store.reserve(fateId2); + try { assertEquals(TStatus.NEW, txStore.getStatus()); + txStore.delete(); } finally { + txStore.unreserve(Duration.ZERO); + } + + // should be able to recreate something at the same id + fateId1 = store.create(); + assertEquals(UUIDS.get(0), fateId1.getTxUUID()); + txStore = store.reserve(fateId1); + try { + assertEquals(TStatus.NEW, txStore.getStatus()); + assertTrue(txStore.timeCreated() > 0); + assertNull(txStore.top()); + assertTrue(txStore.getKey().isEmpty()); + assertEquals(fateId1, txStore.getID()); + assertTrue(txStore.getStack().isEmpty()); txStore.delete(); + } finally { txStore.unreserve(Duration.ZERO); } + assertEquals(Set.of(), store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet())); + + } + + @Test + public void testConcurrent() throws Exception { + executeTest(this::testConcurrent); + } + + protected void testConcurrent(FateStore store, ServerContext sctx) throws Exception { + KeyExtent ke = + new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new Text("aaa")); + FateKey fateKey = FateKey.forSplit(ke); + + var executor = Executors.newFixedThreadPool(10); + try { + // have 10 threads all try to seed the same fate key, only one should succeed. + List>> futures = new ArrayList<>(10); + for (int i = 0; i < 10; i++) { + futures.add( + executor.submit(() -> store.seedTransaction("TEST", fateKey, new TestRepo(), true))); + } + + int idsSeen = 0; + for (var future : futures) { + if (future.get().isPresent()) { + idsSeen++; + } + } + + assertEquals(1, idsSeen); + assertEquals(1, store.list(FateKey.FateKeyType.SPLIT).count()); + assertEquals(0, store.list(FateKey.FateKeyType.COMPACTION_COMMIT).count()); + + for (var future : futures) { + if (future.get().isPresent()) { + var txStore = store.reserve(future.get().orElseThrow()); + try { + txStore.delete(); + } finally { + txStore.unreserve(Duration.ZERO); + } + } + } + + assertEquals(0, store.list(FateKey.FateKeyType.SPLIT).count()); + assertEquals(0, store.list(FateKey.FateKeyType.COMPACTION_COMMIT).count()); + + } finally { + executor.shutdown(); + } + } @Test @@ -500,9 +685,8 @@ protected void testListFateKeys(FateStore store, ServerContext sctx) th Map fateKeyIds = new HashMap<>(); for (FateKey fateKey : List.of(fateKey1, fateKey2, fateKey3, fateKey4)) { - var fateTx = store.createAndReserve(fateKey).orElseThrow(); - fateKeyIds.put(fateKey, fateTx.getID()); - fateTx.unreserve(Duration.ZERO); + var fateId = store.seedTransaction("TEST", fateKey, new TestRepo(), true).orElseThrow(); + fateKeyIds.put(fateKey, fateId); } HashSet allIds = new HashSet<>();