Skip to content

Commit

Permalink
ODP-925|Druid PR-14767 | migrating guave to v27.0-jre
Browse files Browse the repository at this point in the history
  • Loading branch information
manishsinghmowall committed Dec 15, 2023
1 parent 4f998db commit 66b0a20
Show file tree
Hide file tree
Showing 41 changed files with 167 additions and 94 deletions.
3 changes: 0 additions & 3 deletions codestyle/guava16-forbidden-apis.txt
Original file line number Diff line number Diff line change
@@ -1,3 +0,0 @@
# Those signatures are only available in Guava 16:
com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor() @ Use org.apache.druid.java.util.common.concurrent.Execs#directExecutor()
com.google.common.base.Objects#firstNonNull(java.lang.Object, java.lang.Object) @ Use org.apache.druid.common.guava.GuavaUtils#firstNonNull(java.lang.Object, java.lang.Object) instead (probably... the GuavaUtils method return object is nullable)
Original file line number Diff line number Diff line change
Expand Up @@ -1199,7 +1199,7 @@ public void onFailure(Throwable t)
settableFuture.setException(t);
}
}
});
}, MoreExecutors.directExecutor());

taskFutures.add(settableFuture);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.frame.channel.ReadableByteChunksFrameChannel;
import org.apache.druid.frame.key.ClusterByPartitions;
Expand Down Expand Up @@ -158,7 +159,8 @@ public void onFailure(Throwable t)
{
retVal.setException(new MSQException(t, new WorkerRpcFailedFault(workerTaskId)));
}
}
},
MoreExecutors.directExecutor()
);

return retVal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import it.unimi.dsi.fastutil.bytes.ByteArrays;
import org.apache.druid.common.guava.FutureUtils;
Expand Down Expand Up @@ -1309,7 +1310,8 @@ public void onFailure(final Throwable t)
kernelHolder.getStageKernelMap().get(stageDef.getId()).fail(t)
);
}
}
},
MoreExecutors.directExecutor()
);
}

Expand Down Expand Up @@ -1601,7 +1603,7 @@ public OutputChannel openNilChannel(int expectedZero)
};

// Chain futures so we only sort one partition at a time.
nextFuture = Futures.transform(
nextFuture = Futures.transformAsync(
nextFuture,
(AsyncFunction<OutputChannel, OutputChannel>) ignored -> {
final SuperSorter sorter = new SuperSorter(
Expand All @@ -1628,7 +1630,8 @@ public OutputChannel openNilChannel(int expectedZero)
);

return FutureUtils.transform(sorter.run(), r -> Iterables.getOnlyElement(r.getAllChannels()));
}
},
MoreExecutors.directExecutor()
);

sortedChannelFutures.add(nextFuture);
Expand All @@ -1654,16 +1657,18 @@ public ListenableFuture<OutputChannels> build()
throw new ISE("Not initialized");
}

return Futures.transform(
return Futures.transformAsync(
pipelineFuture,
(AsyncFunction<ResultAndChannels<?>, OutputChannels>) resultAndChannels ->
Futures.transform(
resultAndChannels.getResultFuture(),
(Function<Object, OutputChannels>) input -> {
sanityCheckOutputChannels(resultAndChannels.getOutputChannels());
return resultAndChannels.getOutputChannels();
}
)
},
MoreExecutors.directExecutor()
),
MoreExecutors.directExecutor()
);
}

Expand Down Expand Up @@ -1731,7 +1736,8 @@ public void onFailure(Throwable t)
}
);
}
}
},
MoreExecutors.directExecutor()
);

return new ResultAndChannels<>(
Expand Down Expand Up @@ -1761,7 +1767,7 @@ private void pushAsync(final ExceptionalFunction<ResultAndChannels<?>, Listenabl
}

pipelineFuture = FutureUtils.transform(
Futures.transform(
Futures.transformAsync(
pipelineFuture,
new AsyncFunction<ResultAndChannels<?>, ResultAndChannels<?>>()
{
Expand All @@ -1770,7 +1776,8 @@ public ListenableFuture<ResultAndChannels<?>> apply(ResultAndChannels<?> t) thro
{
return fn.apply(t);
}
}
},
MoreExecutors.directExecutor()
),
resultAndChannels -> new ResultAndChannels<>(
resultAndChannels.getResultFuture(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.java.util.common.ISE;
Expand Down Expand Up @@ -210,7 +211,7 @@ public void onFailure(Throwable t)
}

}
});
}, MoreExecutors.directExecutor());

FutureUtils.getUnchecked(kernelActionFuture, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.common.guava.FutureUtils;
Expand Down Expand Up @@ -268,7 +269,8 @@ public void onFailure(Throwable t)
{
retVal.setException(t);
}
}
},
MoreExecutors.directExecutor()
);

return retVal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.frame.channel.ReadableByteChunksFrameChannel;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -90,7 +91,8 @@ public void onFailure(Throwable t)
{
channel.setError(t);
}
}
},
MoreExecutors.directExecutor()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.common.guava.FutureUtils;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -61,7 +62,8 @@ public void onFailure(Throwable t)
inputFuture.cancel(true);
}
}
}
},
MoreExecutors.directExecutor()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.data.input.Committer;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
Expand Down Expand Up @@ -698,9 +699,10 @@ private void publishSegments(
committerSupplier.get(),
Collections.singletonList(sequenceName)
);
pendingHandoffs.add(Futures.transform(
pendingHandoffs.add(Futures.transformAsync(
publishFuture,
(AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) driver::registerHandoff
(AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) driver::registerHandoff,
MoreExecutors.directExecutor()
));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskToolbox;
Expand Down Expand Up @@ -262,7 +263,8 @@ public void onFailure(Throwable t)
LOG.error(t, "Error while running a task for spec[%s]", spec.getId());
taskCompleteEvents.offer(SubTaskCompleteEvent.fail(spec, t));
}
}
},
MoreExecutors.directExecutor()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public boolean matches(char c)
if (inQuotes) {
return false;
}
return CharMatcher.BREAKING_WHITESPACE.matches(c);
return CharMatcher.breakingWhitespace().matches(c);
}
}
).omitEmptyStrings().split(string).iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ public void onFailure(Throwable throwable)
waitingForMonitor.notifyAll();
}
}
}
},
MoreExecutors.directExecutor()
);
break;
case CHILD_UPDATED:
Expand Down Expand Up @@ -1298,7 +1299,8 @@ public void onFailure(Throwable t)
{
removedWorkerCleanups.remove(worker, cleanupTask);
}
}
},
MoreExecutors.directExecutor()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -1560,10 +1561,10 @@ public int hashCode()
@Override
public String toString()
{
return Objects.toStringHelper(this)
.add("taskLock", taskLock)
.add("taskIds", taskIds)
.toString();
return MoreObjects.toStringHelper(this)
.add("taskLock", taskLock)
.add("taskIds", taskIds)
.toString();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,8 @@ public void onFailure(Throwable t)
{
removedWorkerCleanups.remove(workerHostAndPort, cleanupTask);
}
}
},
MoreExecutors.directExecutor()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskLocation;
Expand Down Expand Up @@ -397,7 +398,8 @@ public void onFailure(Throwable t)
{
retVal.setException(t);
}
}
},
MoreExecutors.directExecutor()
),
sleepTime,
TimeUnit.MILLISECONDS
Expand Down Expand Up @@ -574,7 +576,8 @@ public void onFailure(Throwable t)
retVal.set(either.valueOrThrow());
}
}
}
},
MoreExecutors.directExecutor()
);

return retVal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
Expand Down Expand Up @@ -137,6 +138,7 @@
* @param <PartitionIdType> Partition Number Type
* @param <SequenceOffsetType> Sequence Number Type
*/
@SuppressWarnings("CheckReturnValue")
public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType, RecordType extends ByteEntity> implements ChatHandler
{
public enum Status
Expand Down Expand Up @@ -697,7 +699,8 @@ public void onFailure(Throwable t)
log.error("Persist failed, dying");
backgroundThreadException = t;
}
}
},
MoreExecutors.directExecutor()
);
}

Expand Down Expand Up @@ -970,7 +973,8 @@ private void publishAndRegisterHandoff(SequenceMetadata<PartitionIdType, Sequenc
} else {
return publishedSegmentsAndMetadata;
}
}
},
MoreExecutors.directExecutor()
);
publishWaitList.add(publishFuture);

Expand Down Expand Up @@ -1026,7 +1030,8 @@ public Void apply(@Nullable SegmentsAndCommitMetadata handoffSegmentsAndCommitMe
handoffFuture.set(handoffSegmentsAndCommitMetadata);
return null;
}
}
},
MoreExecutors.directExecutor()
);
// emit segment count metric:
int segmentCount = 0;
Expand All @@ -1047,7 +1052,8 @@ public void onFailure(Throwable t)
log.error(t, "Error while publishing segments for sequenceNumber[%s]", sequenceMetadata);
handoffFuture.setException(t);
}
}
},
MoreExecutors.directExecutor()
);
}

Expand Down
Loading

0 comments on commit 66b0a20

Please sign in to comment.