Skip to content

Commit

Permalink
SeekableStreamSupervisor: Use workerExec as the client connectExec. (#…
Browse files Browse the repository at this point in the history
…17394)

* SeekableStreamSupervisor: Use workerExec as the client connectExec.

This patch uses the already-existing per-supervisor workerExec as the
connectExec for task clients, rather than using the process-wide default
ServiceClientFactory pool.

This helps prevent callbacks from backlogging on the process-wide pool.
It's especially useful for retries, where callbacks may need to establish
new TCP connections or perform TLS handshakes.

* Fix compilation, tests.

* Fix style.
  • Loading branch information
gianm authored Oct 23, 2024
1 parent 1157ecd commit 60dadde
Showing 8 changed files with 785 additions and 321 deletions.
Original file line number Diff line number Diff line change
@@ -25,17 +25,17 @@
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.java.util.http.client.HttpClient;

@LazySingleton
public class RabbitStreamIndexTaskClientFactory extends SeekableStreamIndexTaskClientFactory<String, Long>
{
@Inject
public RabbitStreamIndexTaskClientFactory(
@EscalatedGlobal ServiceClientFactory serviceClientFactory,
@EscalatedGlobal HttpClient httpClient,
@Json ObjectMapper mapper)
{
super(serviceClientFactory, mapper);
super(httpClient, mapper);
}

@Override
Original file line number Diff line number Diff line change
@@ -26,18 +26,18 @@
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.java.util.http.client.HttpClient;

@LazySingleton
public class KafkaIndexTaskClientFactory extends SeekableStreamIndexTaskClientFactory<KafkaTopicPartition, Long>
{
@Inject
public KafkaIndexTaskClientFactory(
@EscalatedGlobal ServiceClientFactory serviceClientFactory,
@EscalatedGlobal HttpClient httpClient,
@Json ObjectMapper mapper
)
{
super(serviceClientFactory, mapper);
super(httpClient, mapper);
}

@Override
Loading

0 comments on commit 60dadde

Please sign in to comment.