Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into queryResultPusher-c…
Browse files Browse the repository at this point in the history
…ause-fix
  • Loading branch information
kgyrtkirk committed Sep 18, 2023
2 parents ee1e1fa + 973fbaf commit 237065d
Show file tree
Hide file tree
Showing 35 changed files with 1,335 additions and 223 deletions.
8 changes: 4 additions & 4 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@
- 'processing/src/main/java/org/apache/druid/java/util/emitter/**'
- 'extensions-contrib/*-emitter/**'

'Area - MSQ':
- 'extensions-core/multi-stage-query/**'

'Area - Querying':
- 'sql/**'
- 'extensions-core/multi-stage-query/**'
- 'extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/**'

'Area - Segment Format and Ser/De':
- 'processing/src/main/java/org/apache/druid/segment/**'
Expand All @@ -62,6 +65,3 @@

'Kubernetes':
- 'extensions-contrib/kubernetes-overlord-extensions/**'

'MSQ':
- 'extensions-core/multi-stage-query/**'
2 changes: 1 addition & 1 deletion doap_Druid.rdf
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
limitations under the License.
-->
<Project rdf:about="https://druid.apache.org/">
<created>2023-09-08</created>
<created>2012-10-23</created>
<license rdf:resource="https://spdx.org/licenses/Apache-2.0" />
<name>Apache Druid</name>
<homepage rdf:resource="https://druid.apache.org/" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.msq.exec.ControllerImpl;
Expand All @@ -69,6 +70,7 @@ public class MSQControllerTask extends AbstractTask implements ClientTaskQuery
{
public static final String TYPE = "query_controller";
public static final String DUMMY_DATASOURCE_FOR_SELECT = "__query_select";
private static final Logger log = new Logger(MSQControllerTask.class);

private final MSQSpec querySpec;

Expand Down Expand Up @@ -204,7 +206,7 @@ public boolean isReady(TaskActionClient taskActionClient) throws Exception
if (isIngestion(querySpec) && ((DataSourceMSQDestination) querySpec.getDestination()).isReplaceTimeChunks()) {
final List<Interval> intervals =
((DataSourceMSQDestination) querySpec.getDestination()).getReplaceTimeChunks();

log.debug("Task[%s] trying to acquire[%s] locks for intervals[%s] to become ready", getId(), TaskLockType.EXCLUSIVE, intervals);
for (final Interval interval : intervals) {
final TaskLock taskLock =
taskActionClient.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1033,14 +1033,15 @@ public void testReaderRetriesOnSdkClientExceptionButNeverSucceedsThenThrows() th
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
temporaryFolder.newFolder()
);

final IllegalStateException e = Assert.assertThrows(IllegalStateException.class, reader::read);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(IOException.class));
MatcherAssert.assertThat(e.getCause().getCause(), CoreMatchers.instanceOf(SdkClientException.class));
MatcherAssert.assertThat(
e.getCause().getCause().getMessage(),
CoreMatchers.startsWith("Data read has a different length than the expected")
);
try (CloseableIterator<InputRow> readerIterator = reader.read()) {
final IllegalStateException e = Assert.assertThrows(IllegalStateException.class, readerIterator::hasNext);
MatcherAssert.assertThat(e.getCause(), CoreMatchers.instanceOf(IOException.class));
MatcherAssert.assertThat(e.getCause().getCause(), CoreMatchers.instanceOf(SdkClientException.class));
MatcherAssert.assertThat(
e.getCause().getCause().getMessage(),
CoreMatchers.startsWith("Data read has a different length than the expected")
);
}

EasyMock.verify(S3_CLIENT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,9 +494,12 @@ public List<SegmentAllocateResult> allocateSegments(
allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending());
holderList.getPending().forEach(holder -> acquireTaskLock(holder, false));
}

holderList.getPending().forEach(holder -> addTaskAndPersistLocks(holder, isTimeChunkLock));
}
catch (Exception e) {
holderList.clearStaleLocks(this);
throw e;
}
finally {
giant.unlock();
}
Expand Down Expand Up @@ -711,7 +714,8 @@ private TaskLockPosse createNewTaskLockPosse(LockRequest request)
* for the given requests. Updates the holder with the allocated segment if
* the allocation succeeds, otherwise marks it as failed.
*/
private void allocateSegmentIds(
@VisibleForTesting
void allocateSegmentIds(
String dataSource,
Interval interval,
boolean skipSegmentLineageCheck,
Expand Down Expand Up @@ -1598,6 +1602,28 @@ Set<SegmentAllocationHolder> getPending()
return pending;
}

/**
* When task locks are acquired in an attempt to allocate segments, * a new lock posse might be created.
* However, the posse is associated with the task only after all the segment allocations have succeeded.
* If there is an exception, unlock all such unassociated locks.
*/
void clearStaleLocks(TaskLockbox taskLockbox)
{
all
.stream()
.filter(holder -> holder.acquiredLock != null
&& holder.taskLockPosse != null
&& !holder.taskLockPosse.containsTask(holder.task))
.forEach(holder -> {
holder.taskLockPosse.addTask(holder.task);
taskLockbox.unlock(
holder.task,
holder.acquiredLock.getInterval(),
holder.acquiredLock instanceof SegmentLock ? ((SegmentLock) holder.acquiredLock).getPartitionId() : null
);
log.info("Cleared stale lock[%s] for task[%s]", holder.acquiredLock, holder.task.getId());
});
}

List<SegmentAllocateResult> getResults()
{
Expand All @@ -1608,7 +1634,8 @@ List<SegmentAllocateResult> getResults()
/**
* Contains the task, request, lock and final result for a segment allocation.
*/
private static class SegmentAllocationHolder
@VisibleForTesting
static class SegmentAllocationHolder
{
final AllocationHolderList list;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.indexer.TaskStatus;
Expand All @@ -34,6 +35,8 @@
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateRequest;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
Expand All @@ -46,6 +49,7 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
Expand Down Expand Up @@ -1727,6 +1731,117 @@ public void testConflictsWithOverlappingSharedLocks()
validator.expectActiveLocks(conflictingLock, floorLock);
}

@Test
public void testDoNotCleanUsedLockAfterSegmentAllocationFailure()
{
final Task task = NoopTask.create();
final Interval theInterval = Intervals.of("2023/2024");
taskStorage.insert(task, TaskStatus.running(task.getId()));

final TaskLockbox testLockbox = new SegmentAllocationFailingTaskLockbox(taskStorage, metadataStorageCoordinator);
testLockbox.add(task);
final LockResult lockResult = testLockbox.tryLock(task, new TimeChunkLockRequest(
TaskLockType.SHARED,
task,
theInterval,
null
));
Assert.assertTrue(lockResult.isOk());

SegmentAllocateRequest request = new SegmentAllocateRequest(
task,
new SegmentAllocateAction(
task.getDataSource(),
DateTimes.of("2023-01-01"),
Granularities.NONE,
Granularities.YEAR,
task.getId(),
null,
false,
null,
null,
TaskLockType.SHARED
),
90
);

try {
testLockbox.allocateSegments(
ImmutableList.of(request),
"DS",
theInterval,
false,
LockGranularity.TIME_CHUNK
);
}
catch (Exception e) {
// do nothing
}
Assert.assertFalse(testLockbox.getAllLocks().isEmpty());
Assert.assertEquals(
lockResult.getTaskLock(),
testLockbox.getOnlyTaskLockPosseContainingInterval(task, theInterval).get(0).getTaskLock()
);
}

@Test
public void testCleanUpLocksAfterSegmentAllocationFailure()
{
final Task task = NoopTask.create();
taskStorage.insert(task, TaskStatus.running(task.getId()));

final TaskLockbox testLockbox = new SegmentAllocationFailingTaskLockbox(taskStorage, metadataStorageCoordinator);
testLockbox.add(task);

SegmentAllocateRequest request0 = new SegmentAllocateRequest(
task,
new SegmentAllocateAction(
task.getDataSource(),
DateTimes.of("2023-01-01"),
Granularities.NONE,
Granularities.YEAR,
task.getId(),
null,
false,
null,
null,
TaskLockType.SHARED
),
90
);

SegmentAllocateRequest request1 = new SegmentAllocateRequest(
task,
new SegmentAllocateAction(
task.getDataSource(),
DateTimes.of("2023-01-01"),
Granularities.NONE,
Granularities.MONTH,
task.getId(),
null,
false,
null,
null,
TaskLockType.SHARED
),
90
);

try {
testLockbox.allocateSegments(
ImmutableList.of(request0, request1),
"DS",
Intervals.of("2023/2024"),
false,
LockGranularity.TIME_CHUNK
);
}
catch (Exception e) {
// do nothing
}
Assert.assertTrue(testLockbox.getAllLocks().isEmpty());
}


private class TaskLockboxValidator
{
Expand Down Expand Up @@ -1953,4 +2068,26 @@ protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskL
.contains("FailingLockAcquisition") ? null : super.verifyAndCreateOrFindLockPosse(task, taskLock);
}
}

private static class SegmentAllocationFailingTaskLockbox extends TaskLockbox
{
public SegmentAllocationFailingTaskLockbox(
TaskStorage taskStorage,
IndexerMetadataStorageCoordinator metadataStorageCoordinator
)
{
super(taskStorage, metadataStorageCoordinator);
}

@Override
void allocateSegmentIds(
String dataSource,
Interval interval,
boolean skipSegmentLineageCheck,
Collection<SegmentAllocationHolder> holders
)
{
throw new RuntimeException("This lockbox cannot allocate segemnts.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.java.util.common.parsers;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -62,37 +61,37 @@ public void close() throws IOException

default <R> CloseableIterator<R> flatMap(Function<T, CloseableIterator<R>> function)
{
final CloseableIterator<T> delegate = this;
final CloseableIterator<T> outerIterator = this;

return new CloseableIterator<R>()
{
CloseableIterator<R> iterator = findNextIteratorIfNecessary();
CloseableIterator<R> currInnerIterator = null;

@Nullable
private CloseableIterator<R> findNextIteratorIfNecessary()
private void findNextIteratorIfNecessary()
{
while ((iterator == null || !iterator.hasNext()) && delegate.hasNext()) {
if (iterator != null) {
while ((currInnerIterator == null || !currInnerIterator.hasNext()) && outerIterator.hasNext()) {
if (currInnerIterator != null) {
try {
iterator.close();
iterator = null;
currInnerIterator.close();
currInnerIterator = null;
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
iterator = function.apply(delegate.next());
if (iterator.hasNext()) {
return iterator;
currInnerIterator = function.apply(outerIterator.next());
if (currInnerIterator.hasNext()) {
return;
}
}
return null;
}

@Override
public boolean hasNext()
{
return iterator != null && iterator.hasNext();
// closes the current iterator if it is finished, and opens a new non-empty iterator if possible
findNextIteratorIfNecessary();
return currInnerIterator != null && currInnerIterator.hasNext();
}

@Override
Expand All @@ -101,21 +100,16 @@ public R next()
if (!hasNext()) {
throw new NoSuchElementException();
}
try {
return iterator.next();
}
finally {
findNextIteratorIfNecessary();
}
return currInnerIterator.next();
}

@Override
public void close() throws IOException
{
delegate.close();
if (iterator != null) {
iterator.close();
iterator = null;
outerIterator.close();
if (currInnerIterator != null) {
currInnerIterator.close();
currInnerIterator = null;
}
}
};
Expand Down
Loading

0 comments on commit 237065d

Please sign in to comment.