From a34a06e1926e2006807a252a9387dc13b926db70 Mon Sep 17 00:00:00 2001
From: Clint Wylie
Date: Fri, 19 Jul 2024 14:37:21 -0700
Subject: [PATCH] remove Firehose and FirehoseFactory (#16758)
changes:
* removed `Firehose` and `FirehoseFactory` and remaining implementations which were mostly no longer used after #16602
* Moved `IngestSegmentFirehose` which was still used internally by Hadoop ingestion to `DatasourceRecordReader.SegmentReader`
* Rename `SQLFirehoseFactoryDatabaseConnector` to `SQLInputSourceDatabaseConnector` and similar renames for sub-classes
* Moved anything remaining in a 'firehose' package somewhere else
* Clean up docs on firehose stuff
---
docs/configuration/extensions.md | 2 +-
docs/configuration/index.md | 5 +-
.../extensions-contrib/cloudfiles.md | 56 --
.../development/extensions-core/postgresql.md | 2 +-
docs/development/overview.md | 13 +-
docs/ingestion/native-batch-firehose.md | 316 +--------
docs/operations/metrics.md | 9 -
.../migrate-from-firehose-ingestion.md | 4 +-
docs/operations/security-overview.md | 2 +-
.../k8s/overlord/common/K8sTestUtils.java | 1 -
.../DruidPeonClientIntegrationTest.java | 5 -
.../taskadapter/K8sTaskAdapterTest.java | 5 -
.../MultiContainerTaskAdapterTest.java | 5 -
.../SingleContainerTaskAdapterTest.java | 5 -
.../MaterializedViewSupervisorSpec.java | 2 +-
.../MaterializedViewSupervisorSpecTest.java | 4 +-
.../MaterializedViewSupervisorTest.java | 2 +-
.../src/main/resources/defaultMetrics.json | 3 -
extensions-core/azure-extensions/pom.xml | 4 -
.../kinesis/KinesisIndexTaskSerdeTest.java | 4 +-
.../indexing/IndexerControllerContext.java | 2 +-
.../client/ControllerChatHandler.java | 2 +-
.../indexing/client/WorkerChatHandler.java | 4 +-
.../druid/msq/test/MSQTestWorkerContext.java | 2 +-
.../MySQLInputSourceDatabaseConnector.java} | 10 +-
.../mysql/MySQLMetadataStorageModule.java | 4 +-
...ySQLInputSourceDatabaseConnectorTest.java} | 34 +-
...stgresqlInputSourceDatabaseConnector.java} | 12 +-
.../PostgreSQLMetadataStorageModule.java | 4 +-
...esqlInputSourceDatabaseConnectorTest.java} | 26 +-
.../hadoop/DatasourceRecordReader.java | 193 ++++-
.../indexer/BatchDeltaIngestionTest.java | 12 +-
...tasourceRecordReaderSegmentReaderTest.java | 19 +-
.../guice/IndexingServiceFirehoseModule.java | 49 --
.../druid/indexing/common/TaskToolbox.java | 2 +-
.../indexing/common/TaskToolboxFactory.java | 2 +-
.../indexing/common/task/HadoopIndexTask.java | 4 +-
.../druid/indexing/common/task/IndexTask.java | 34 +-
.../common/task/NativeCompactionRunner.java | 1 -
.../druid/indexing/common/task/Task.java | 12 +-
...putSourceSplitParallelIndexTaskRunner.java | 4 -
.../parallel/LegacySinglePhaseSubTask.java | 3 -
.../batch/parallel/ParallelIndexIOConfig.java | 17 +-
.../parallel/ParallelIndexSupervisorTask.java | 9 +-
.../PartialDimensionCardinalityTask.java | 3 -
.../PartialDimensionDistributionTask.java | 3 -
.../PartialHashSegmentGenerateTask.java | 3 -
.../PartialRangeSegmentGenerateTask.java | 3 -
.../parallel/PartialSegmentGenerateTask.java | 2 +-
.../SinglePhaseParallelIndexTaskRunner.java | 4 -
.../batch/parallel/SinglePhaseSubTask.java | 5 +-
.../IndexTaskInputRowIteratorBuilder.java | 2 -
.../indexing/input/DruidInputSource.java | 5 +-
.../indexing/input/DruidSegmentReader.java | 2 +-
.../WindowedSegmentId.java | 2 +-
.../overlord/sampler/InputSourceSampler.java | 4 +-
.../SeekableStreamIndexTask.java | 2 +-
.../SeekableStreamIndexTaskRunner.java | 2 +-
.../indexing/common/TaskToolboxTest.java | 2 +-
.../druid/indexing/common/TestFirehose.java | 118 ----
.../druid/indexing/common/TestIndexTask.java | 1 -
.../druid/indexing/common/TestUtils.java | 10 +-
.../common/task/BatchAppenderatorsTest.java | 2 +-
.../ClientCompactionTaskQuerySerdeTest.java | 4 +-
.../task/CompactionTaskParallelRunTest.java | 3 +-
.../common/task/CompactionTaskRunTest.java | 4 +-
.../common/task/CompactionTaskTest.java | 4 +-
.../common/task/IndexIngestionSpecTest.java | 2 -
.../indexing/common/task/IndexTaskTest.java | 7 +-
.../common/task/IngestionTestBase.java | 7 +-
.../indexing/common/task/TaskSerdeTest.java | 29 +-
...bstractMultiPhaseParallelIndexingTest.java | 2 -
...stractParallelIndexSupervisorTaskTest.java | 4 +-
.../parallel/HashPartitionTaskKillTest.java | 2 -
...aseParallelIndexingWithNullColumnTest.java | 4 -
.../ParallelIndexSupervisorTaskKillTest.java | 3 -
...rallelIndexSupervisorTaskResourceTest.java | 2 -
.../ParallelIndexSupervisorTaskSerdeTest.java | 1 -
.../ParallelIndexSupervisorTaskTest.java | 3 -
.../parallel/ParallelIndexTestingFactory.java | 2 +-
.../parallel/RangePartitionTaskKillTest.java | 1 -
.../SinglePhaseParallelIndexingTest.java | 6 -
.../parallel/SinglePhaseSubTaskSpecTest.java | 1 -
.../indexing/input/DruidInputSourceTest.java | 1 -
.../WindowedSegmentIdTest.java | 2 +-
.../overlord/ForkingTaskRunnerTest.java | 2 -
.../SingleTaskBackgroundRunnerTest.java | 2 +-
.../indexing/overlord/TaskLifecycleTest.java | 45 +-
.../indexing/overlord/TaskQueueTest.java | 1 -
.../overlord/TestTaskToolboxFactory.java | 2 +-
.../overlord/http/OverlordResourceTest.java | 2 +-
.../sampler/IndexTaskSamplerSpecTest.java | 5 +-
.../SeekableStreamIndexTaskTestBase.java | 2 +-
.../indexing/worker/TaskAnnouncementTest.java | 2 +-
.../worker/WorkerTaskManagerTest.java | 2 +-
.../worker/WorkerTaskMonitorTest.java | 2 +-
.../ITBestEffortRollupParallelIndexTest.java | 2 +-
.../EventReceiverFirehoseTestClient.java | 181 -----
.../testing/guice/DruidTestModuleFactory.java | 2 -
.../ITBestEffortRollupParallelIndexTest.java | 2 +-
.../ITPerfectRollupParallelIndexTest.java | 2 +-
.../org/apache/druid/data/input/Firehose.java | 97 ---
.../druid/data/input/FirehoseFactory.java | 82 ---
.../data/input/SegmentsSplitHintSpec.java | 5 +-
.../org/apache/druid/segment/Metadata.java | 2 +-
.../impl/InputEntityIteratingReaderTest.java | 4 +-
.../apache/druid/guice/FirehoseModule.java | 55 --
.../initialization/CoreInjectorBuilder.java | 2 -
.../druid/metadata/BasicDataSourceExt.java | 2 +-
...a => SQLInputSourceDatabaseConnector.java} | 2 +-
.../druid/metadata/input/SqlEntity.java | 20 +-
.../druid/metadata/input/SqlInputSource.java | 23 +-
.../rpc/indexing/SpecificTaskRetryPolicy.java | 2 +-
.../realtime/{firehose => }/ChatHandler.java | 2 +-
.../{firehose => }/ChatHandlerProvider.java | 2 +-
.../{firehose => }/ChatHandlerResource.java | 2 +-
.../realtime/{firehose => }/ChatHandlers.java | 2 +-
.../druid/segment/realtime/FireHydrant.java | 2 +
.../NoopChatHandlerProvider.java | 2 +-
.../ServiceAnnouncingChatHandlerProvider.java | 2 +-
.../WindowedStorageAdapter.java | 2 +-
.../appenderator/AppenderatorImpl.java | 4 +-
.../appenderator/BatchAppenderator.java | 2 +-
.../appenderator/StreamAppenderator.java | 2 +-
.../firehose/ClippedFirehoseFactory.java | 81 ---
.../realtime/firehose/EventReceiver.java | 28 -
.../EventReceiverFirehoseFactory.java | 659 ------------------
.../firehose/FixedCountFirehoseFactory.java | 93 ---
.../firehose/IngestSegmentFirehose.java | 210 ------
.../realtime/firehose/PredicateFirehose.java | 89 ---
.../firehose/TimedShutoffFirehoseFactory.java | 139 ----
.../realtime/{firehose => }/package-info.java | 2 +-
.../jetty/ChatHandlerServerModule.java | 2 +-
.../jetty/CliIndexerServerModule.java | 2 +-
.../TaskIdResponseHeaderFilterHolder.java | 2 +-
.../metrics/EventReceiverFirehoseMetric.java | 48 --
.../metrics/EventReceiverFirehoseMonitor.java | 90 ---
.../EventReceiverFirehoseRegister.java | 56 --
.../druid/server/metrics/MetricsModule.java | 1 -
.../discovery/ServiceAnnouncerTest.java | 27 +-
.../druid/guice/FirehoseModuleTest.java | 93 ---
.../druid/metadata/input/SqlEntityTest.java | 4 +-
.../metadata/input/SqlInputSourceTest.java | 111 ++-
.../druid/metadata/input/SqlTestUtils.java | 55 +-
.../ChatHandlerResourceTest.java | 2 +-
...viceAnnouncingChatHandlerProviderTest.java | 2 +-
.../EventReceiverFirehoseIdleTest.java | 136 ----
.../firehose/EventReceiverFirehoseTest.java | 442 ------------
.../java/org/apache/druid/cli/CliIndexer.java | 2 -
.../apache/druid/cli/CliMiddleManager.java | 6 +-
.../org/apache/druid/cli/CliOverlord.java | 6 +-
.../java/org/apache/druid/cli/CliPeon.java | 8 +-
.../cli/validate/DruidJsonValidator.java | 4 -
.../cli/validate/DruidJsonValidatorTest.java | 1 -
website/.spelling | 24 +-
155 files changed, 534 insertions(+), 3646 deletions(-)
rename extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/{firehose/sql/MySQLFirehoseDatabaseConnector.java => metadata/input/MySQLInputSourceDatabaseConnector.java} (91%)
rename extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/{firehose/sql/MySQLFirehoseDatabaseConnectorTest.java => metadata/input/MySQLInputSourceDatabaseConnectorTest.java} (92%)
rename extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/{firehose/PostgresqlFirehoseDatabaseConnector.java => metadata/input/PostgresqlInputSourceDatabaseConnector.java} (87%)
rename extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/{firehose/PostgresqlFirehoseDatabaseConnectorTest.java => metadata/input/PostgresqlInputSourceDatabaseConnectorTest.java} (90%)
rename server/src/test/java/org/apache/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java => indexing-hadoop/src/test/java/org/apache/druid/indexer/hadoop/DatasourceRecordReaderSegmentReaderTest.java (92%)
delete mode 100644 indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceFirehoseModule.java
rename indexing-service/src/main/java/org/apache/druid/indexing/{firehose => input}/WindowedSegmentId.java (98%)
delete mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/common/TestFirehose.java
rename indexing-service/src/test/java/org/apache/druid/indexing/{firehose => input}/WindowedSegmentIdTest.java (96%)
delete mode 100644 integration-tests/src/main/java/org/apache/druid/testing/clients/EventReceiverFirehoseTestClient.java
delete mode 100644 processing/src/main/java/org/apache/druid/data/input/Firehose.java
delete mode 100644 processing/src/main/java/org/apache/druid/data/input/FirehoseFactory.java
delete mode 100644 server/src/main/java/org/apache/druid/guice/FirehoseModule.java
rename server/src/main/java/org/apache/druid/metadata/{SQLFirehoseDatabaseConnector.java => SQLInputSourceDatabaseConnector.java} (98%)
rename server/src/main/java/org/apache/druid/segment/realtime/{firehose => }/ChatHandler.java (95%)
rename server/src/main/java/org/apache/druid/segment/realtime/{firehose => }/ChatHandlerProvider.java (97%)
rename server/src/main/java/org/apache/druid/segment/realtime/{firehose => }/ChatHandlerResource.java (98%)
rename server/src/main/java/org/apache/druid/segment/realtime/{firehose => }/ChatHandlers.java (97%)
rename server/src/main/java/org/apache/druid/segment/realtime/{firehose => }/NoopChatHandlerProvider.java (96%)
rename server/src/main/java/org/apache/druid/segment/realtime/{firehose => }/ServiceAnnouncingChatHandlerProvider.java (98%)
rename server/src/main/java/org/apache/druid/segment/realtime/{firehose => }/WindowedStorageAdapter.java (96%)
delete mode 100644 server/src/main/java/org/apache/druid/segment/realtime/firehose/ClippedFirehoseFactory.java
delete mode 100644 server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiver.java
delete mode 100644 server/src/main/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java
delete mode 100644 server/src/main/java/org/apache/druid/segment/realtime/firehose/FixedCountFirehoseFactory.java
delete mode 100644 server/src/main/java/org/apache/druid/segment/realtime/firehose/IngestSegmentFirehose.java
delete mode 100644 server/src/main/java/org/apache/druid/segment/realtime/firehose/PredicateFirehose.java
delete mode 100644 server/src/main/java/org/apache/druid/segment/realtime/firehose/TimedShutoffFirehoseFactory.java
rename server/src/main/java/org/apache/druid/segment/realtime/{firehose => }/package-info.java (94%)
delete mode 100644 server/src/main/java/org/apache/druid/server/metrics/EventReceiverFirehoseMetric.java
delete mode 100644 server/src/main/java/org/apache/druid/server/metrics/EventReceiverFirehoseMonitor.java
delete mode 100644 server/src/main/java/org/apache/druid/server/metrics/EventReceiverFirehoseRegister.java
delete mode 100644 server/src/test/java/org/apache/druid/guice/FirehoseModuleTest.java
rename server/src/test/java/org/apache/druid/segment/realtime/{firehose => }/ChatHandlerResourceTest.java (97%)
rename server/src/test/java/org/apache/druid/segment/realtime/{firehose => }/ServiceAnnouncingChatHandlerProviderTest.java (99%)
delete mode 100644 server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseIdleTest.java
delete mode 100644 server/src/test/java/org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.java
diff --git a/docs/configuration/extensions.md b/docs/configuration/extensions.md
index d396bc29000d..bc7a05a4ae23 100644
--- a/docs/configuration/extensions.md
+++ b/docs/configuration/extensions.md
@@ -80,7 +80,7 @@ All of these community extensions can be downloaded using [pull-deps](../operati
|aliyun-oss-extensions|Aliyun OSS deep storage |[link](../development/extensions-contrib/aliyun-oss-extensions.md)|
|ambari-metrics-emitter|Ambari Metrics Emitter |[link](../development/extensions-contrib/ambari-metrics-emitter.md)|
|druid-cassandra-storage|Apache Cassandra deep storage.|[link](../development/extensions-contrib/cassandra.md)|
-|druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage and firehose.|[link](../development/extensions-contrib/cloudfiles.md)|
+|druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage.|[link](../development/extensions-contrib/cloudfiles.md)|
|druid-compressed-bigdecimal|Compressed Big Decimal Type | [link](../development/extensions-contrib/compressed-big-decimal.md)|
|druid-ddsketch|Support for DDSketch approximate quantiles based on [DDSketch](https://github.com/datadog/sketches-java) | [link](../development/extensions-contrib/ddsketch-quantiles.md)|
|druid-deltalake-extensions|Support for ingesting Delta Lake tables.|[link](../development/extensions-contrib/delta-lake.md)|
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index c627e9fd7f07..39c14b77c461 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -395,7 +395,6 @@ Metric monitoring is an essential part of Druid operations. The following monito
|`org.apache.druid.java.util.metrics.CgroupCpuSetMonitor`|Reports CPU core/HT and memory node allocations as per the `cpuset` cgroup.|
|`org.apache.druid.java.util.metrics.CgroupDiskMonitor`|Reports disk statistic as per the blkio cgroup.|
|`org.apache.druid.java.util.metrics.CgroupMemoryMonitor`|Reports memory statistic as per the memory cgroup.|
-|`org.apache.druid.server.metrics.EventReceiverFirehoseMonitor`|Reports how many events have been queued in the EventReceiverFirehose.|
|`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical services. Available only on Historical services.|
|`org.apache.druid.server.metrics.SegmentStatsMonitor` | **EXPERIMENTAL** Reports statistics about segments on Historical services. Available only on Historical services. Not to be used when lazy loading is configured.|
|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.|
@@ -607,7 +606,7 @@ the [HDFS input source](../ingestion/input-sources.md#hdfs-input-source).
|Property|Possible values|Description|Default|
|--------|---------------|-----------|-------|
-|`druid.ingestion.hdfs.allowedProtocols`|List of protocols|Allowed protocols for the HDFS input source and HDFS firehose.|`["hdfs"]`|
+|`druid.ingestion.hdfs.allowedProtocols`|List of protocols|Allowed protocols for the HDFS input source.|`["hdfs"]`|
#### HTTP input source
@@ -616,7 +615,7 @@ the [HTTP input source](../ingestion/input-sources.md#http-input-source).
|Property|Possible values|Description|Default|
|--------|---------------|-----------|-------|
-|`druid.ingestion.http.allowedProtocols`|List of protocols|Allowed protocols for the HTTP input source and HTTP firehose.|`["http", "https"]`|
+|`druid.ingestion.http.allowedProtocols`|List of protocols|Allowed protocols for the HTTP input source.|`["http", "https"]`|
### External data access security configuration
diff --git a/docs/development/extensions-contrib/cloudfiles.md b/docs/development/extensions-contrib/cloudfiles.md
index 83a1d0c7e10b..d4e7592ee7f7 100644
--- a/docs/development/extensions-contrib/cloudfiles.md
+++ b/docs/development/extensions-contrib/cloudfiles.md
@@ -40,59 +40,3 @@ To use this Apache Druid extension, [include](../../configuration/extensions.md#
|`druid.cloudfiles.apiKey`||Rackspace Cloud API key.|Must be set.|
|`druid.cloudfiles.provider`|rackspace-cloudfiles-us,rackspace-cloudfiles-uk|Name of the provider depending on the region.|Must be set.|
|`druid.cloudfiles.useServiceNet`|true,false|Whether to use the internal service net.|true|
-
-## Firehose
-
-
-
-#### StaticCloudFilesFirehose
-
-This firehose ingests events, similar to the StaticAzureBlobStoreFirehose, but from Rackspace's Cloud Files.
-
-Data is newline delimited, with one JSON object per line and parsed as per the `InputRowParser` configuration.
-
-The storage account is shared with the one used for Rackspace's Cloud Files deep storage functionality, but blobs can be in a different region and container.
-
-As with the Azure blobstore, it is assumed to be gzipped if the extension ends in .gz
-
-This firehose is _splittable_ and can be used by [native parallel index tasks](../../ingestion/native-batch.md).
-Since each split represents an object in this firehose, each worker task of `index_parallel` will read an object.
-
-Sample spec:
-
-```json
-"firehose" : {
- "type" : "static-cloudfiles",
- "blobs": [
- {
- "region": "DFW"
- "container": "container",
- "path": "/path/to/your/file.json"
- },
- {
- "region": "ORD"
- "container": "anothercontainer",
- "path": "/another/path.json"
- }
- ]
-}
-```
-This firehose provides caching and prefetching features. In IndexTask, a firehose can be read twice if intervals or
-shardSpecs are not specified, and, in this case, caching can be useful. Prefetching is preferred when direct scan of objects is slow.
-
-|property|description|default|required?|
-|--------|-----------|-------|---------|
-|type|This should be `static-cloudfiles`.|N/A|yes|
-|blobs|JSON array of Cloud Files blobs.|N/A|yes|
-|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache.|1073741824|no|
-|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|no|
-|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|no|
-|fetchTimeout|Timeout for fetching a Cloud Files object.|60000|no|
-|maxFetchRetry|Maximum retry for fetching a Cloud Files object.|3|no|
-
-Cloud Files Blobs:
-
-|property|description|default|required?|
-|--------|-----------|-------|---------|
-|container|Name of the Cloud Files container|N/A|yes|
-|path|The path where data is located.|N/A|yes|
diff --git a/docs/development/extensions-core/postgresql.md b/docs/development/extensions-core/postgresql.md
index e0d9337b2ccb..919bf372b844 100644
--- a/docs/development/extensions-core/postgresql.md
+++ b/docs/development/extensions-core/postgresql.md
@@ -87,7 +87,7 @@ In most cases, the configuration options map directly to the [postgres JDBC conn
| `druid.metadata.postgres.ssl.sslPasswordCallback` | The classname of the SSL password provider. | none | no |
| `druid.metadata.postgres.dbTableSchema` | druid meta table schema | `public` | no |
-### PostgreSQL Firehose
+### PostgreSQL InputSource
The PostgreSQL extension provides an implementation of an [SQL input source](../../ingestion/input-sources.md) which can be used to ingest data into Druid from a PostgreSQL database.
diff --git a/docs/development/overview.md b/docs/development/overview.md
index 5ff77af07cf4..11c67ddfa2c4 100644
--- a/docs/development/overview.md
+++ b/docs/development/overview.md
@@ -53,8 +53,17 @@ Most of the coordination logic for (real-time) ingestion is in the Druid indexin
## Real-time Ingestion
-Druid loads data through `FirehoseFactory.java` classes. Firehoses often wrap other firehoses, where, similar to the design of the
-query runners, each firehose adds a layer of logic, and the persist and hand-off logic is in `RealtimePlumber.java`.
+Druid streaming tasks are based on the 'seekable stream' classes such as `SeekableStreamSupervisor.java`,
+`SeekableStreamIndexTask.java`, and `SeekableStreamIndexTaskRunner.java`. The data processing happens through
+`StreamAppenderator.java`, and the persist and hand-off logic is in `StreamAppenderatorDriver.java`.
+
+## Native Batch Ingestion
+
+Druid native batch ingestion main task types are based on `AbstractBatchTask.java` and `AbstractBatchSubtask.java`.
+Parallel processing uses `ParallelIndexSupervisorTask.java`, which spawns subtasks to perform various operations such
+as data analysis and partitioning depending on the task specification. Segment generation happens in
+`SinglePhaseSubTask.java`, `PartialHashSegmentGenerateTask.java`, or `PartialRangeSegmentGenerateTask.java` through
+`BatchAppenderator`, and the persist and hand-off logic is in `BatchAppenderatorDriver.java`.
## Hadoop-based Batch Ingestion
diff --git a/docs/ingestion/native-batch-firehose.md b/docs/ingestion/native-batch-firehose.md
index db2b3e8779e6..16e9634ff297 100644
--- a/docs/ingestion/native-batch-firehose.md
+++ b/docs/ingestion/native-batch-firehose.md
@@ -24,319 +24,5 @@ sidebar_label: "Firehose (deprecated)"
-->
:::info
- Firehose ingestion is deprecated. See [Migrate from firehose to input source ingestion](../operations/migrate-from-firehose-ingestion.md) for instructions on migrating from firehose ingestion to using native batch ingestion input sources.
+ Firehose ingestion has been removed in Druid 26.0. See [Migrate from firehose to input source ingestion](../operations/migrate-from-firehose-ingestion.md) for instructions on migrating from firehose ingestion to using native batch ingestion input sources.
:::
-
-There are several firehoses readily available in Druid, some are meant for examples, others can be used directly in a production environment.
-
-## StaticS3Firehose
-
-You need to include the [`druid-s3-extensions`](../development/extensions-core/s3.md) as an extension to use the StaticS3Firehose.
-
-This firehose ingests events from a predefined list of S3 objects.
-This firehose is _splittable_ and can be used by the [Parallel task](./native-batch.md).
-Since each split represents an object in this firehose, each worker task of `index_parallel` will read an object.
-
-Sample spec:
-
-```json
-"firehose" : {
- "type" : "static-s3",
- "uris": ["s3://foo/bar/file.gz", "s3://bar/foo/file2.gz"]
-}
-```
-
-This firehose provides caching and prefetching features. In the Simple task, a firehose can be read twice if intervals or
-shardSpecs are not specified, and, in this case, caching can be useful. Prefetching is preferred when direct scan of objects is slow.
-Note that prefetching or caching isn't that useful in the Parallel task.
-
-|property|description|default|required?|
-|--------|-----------|-------|---------|
-|type|This should be `static-s3`.|None|yes|
-|uris|JSON array of URIs where s3 files to be ingested are located.|None|`uris` or `prefixes` must be set|
-|prefixes|JSON array of URI prefixes for the locations of s3 files to be ingested.|None|`uris` or `prefixes` must be set|
-|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|no|
-|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|no|
-|prefetchTriggerBytes|Threshold to trigger prefetching s3 objects.|maxFetchCapacityBytes / 2|no|
-|fetchTimeout|Timeout for fetching an s3 object.|60000|no|
-|maxFetchRetry|Maximum retry for fetching an s3 object.|3|no|
-
-## StaticGoogleBlobStoreFirehose
-
-You need to include the [`druid-google-extensions`](../development/extensions-core/google.md) as an extension to use the StaticGoogleBlobStoreFirehose.
-
-This firehose ingests events, similar to the StaticS3Firehose, but from an Google Cloud Store.
-
-As with the S3 blobstore, it is assumed to be gzipped if the extension ends in .gz
-
-This firehose is _splittable_ and can be used by the [Parallel task](./native-batch.md).
-Since each split represents an object in this firehose, each worker task of `index_parallel` will read an object.
-
-Sample spec:
-
-```json
-"firehose" : {
- "type" : "static-google-blobstore",
- "blobs": [
- {
- "bucket": "foo",
- "path": "/path/to/your/file.json"
- },
- {
- "bucket": "bar",
- "path": "/another/path.json"
- }
- ]
-}
-```
-
-This firehose provides caching and prefetching features. In the Simple task, a firehose can be read twice if intervals or
-shardSpecs are not specified, and, in this case, caching can be useful. Prefetching is preferred when direct scan of objects is slow.
-Note that prefetching or caching isn't that useful in the Parallel task.
-
-|property|description|default|required?|
-|--------|-----------|-------|---------|
-|type|This should be `static-google-blobstore`.|None|yes|
-|blobs|JSON array of Google Blobs.|None|yes|
-|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|no|
-|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|no|
-|prefetchTriggerBytes|Threshold to trigger prefetching Google Blobs.|maxFetchCapacityBytes / 2|no|
-|fetchTimeout|Timeout for fetching a Google Blob.|60000|no|
-|maxFetchRetry|Maximum retry for fetching a Google Blob.|3|no|
-
-Google Blobs:
-
-|property|description|default|required?|
-|--------|-----------|-------|---------|
-|bucket|Name of the Google Cloud bucket|None|yes|
-|path|The path where data is located.|None|yes|
-
-## HDFSFirehose
-
-You need to include the [`druid-hdfs-storage`](../development/extensions-core/hdfs.md) as an extension to use the HDFSFirehose.
-
-This firehose ingests events from a predefined list of files from the HDFS storage.
-This firehose is _splittable_ and can be used by the [Parallel task](./native-batch.md).
-Since each split represents an HDFS file, each worker task of `index_parallel` will read files.
-
-Sample spec:
-
-```json
-"firehose" : {
- "type" : "hdfs",
- "paths": "/foo/bar,/foo/baz"
-}
-```
-
-This firehose provides caching and prefetching features. During native batch indexing, a firehose can be read twice if
-`intervals` are not specified, and, in this case, caching can be useful. Prefetching is preferred when direct scanning
-of files is slow.
-Note that prefetching or caching isn't that useful in the Parallel task.
-
-|Property|Description|Default|
-|--------|-----------|-------|
-|type|This should be `hdfs`.|none (required)|
-|paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths.|none (required)|
-|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|
-|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|
-|prefetchTriggerBytes|Threshold to trigger prefetching files.|maxFetchCapacityBytes / 2|
-|fetchTimeout|Timeout for fetching each file.|60000|
-|maxFetchRetry|Maximum number of retries for fetching each file.|3|
-
-You can also ingest from other storage using the HDFS firehose if the HDFS client supports that storage.
-However, if you want to ingest from cloud storage, consider using the service-specific input source for your data storage.
-If you want to use a non-hdfs protocol with the HDFS firehose, you need to include the protocol you want
-in `druid.ingestion.hdfs.allowedProtocols`. See [HDFS firehose security configuration](../configuration/index.md#hdfs-input-source) for more details.
-
-## LocalFirehose
-
-This Firehose can be used to read the data from files on local disk, and is mainly intended for proof-of-concept testing, and works with `string` typed parsers.
-This Firehose is _splittable_ and can be used by [native parallel index tasks](native-batch.md).
-Since each split represents a file in this Firehose, each worker task of `index_parallel` will read a file.
-A sample local Firehose spec is shown below:
-
-```json
-{
- "type": "local",
- "filter" : "*.csv",
- "baseDir": "/data/directory"
-}
-```
-
-|property|description|required?|
-|--------|-----------|---------|
-|type|This should be "local".|yes|
-|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter) for more information.|yes|
-|baseDir|directory to search recursively for files to be ingested. |yes|
-
-
-
-## HttpFirehose
-
-This Firehose can be used to read the data from remote sites via HTTP, and works with `string` typed parsers.
-This Firehose is _splittable_ and can be used by [native parallel index tasks](native-batch.md).
-Since each split represents a file in this Firehose, each worker task of `index_parallel` will read a file.
-A sample HTTP Firehose spec is shown below:
-
-```json
-{
- "type": "http",
- "uris": ["http://example.com/uri1", "http://example2.com/uri2"]
-}
-```
-
-You can only use protocols listed in the `druid.ingestion.http.allowedProtocols` property as HTTP firehose input sources.
-The `http` and `https` protocols are allowed by default. See [HTTP firehose security configuration](../configuration/index.md#http-input-source) for more details.
-
-The below configurations can be optionally used if the URIs specified in the spec require a Basic Authentication Header.
-Omitting these fields from your spec will result in HTTP requests with no Basic Authentication Header.
-
-|property|description|default|
-|--------|-----------|-------|
-|httpAuthenticationUsername|Username to use for authentication with specified URIs|None|
-|httpAuthenticationPassword|PasswordProvider to use with specified URIs|None|
-
-Example with authentication fields using the DefaultPassword provider (this requires the password to be in the ingestion spec):
-
-```json
-{
- "type": "http",
- "uris": ["http://example.com/uri1", "http://example2.com/uri2"],
- "httpAuthenticationUsername": "username",
- "httpAuthenticationPassword": "password123"
-}
-```
-
-You can also use the other existing Druid PasswordProviders. Here is an example using the EnvironmentVariablePasswordProvider:
-
-```json
-{
- "type": "http",
- "uris": ["http://example.com/uri1", "http://example2.com/uri2"],
- "httpAuthenticationUsername": "username",
- "httpAuthenticationPassword": {
- "type": "environment",
- "variable": "HTTP_FIREHOSE_PW"
- }
-}
-```
-
-The below configurations can optionally be used for tuning the Firehose performance.
-Note that prefetching or caching isn't that useful in the Parallel task.
-
-|property|description|default|
-|--------|-----------|-------|
-|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|
-|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|
-|prefetchTriggerBytes|Threshold to trigger prefetching HTTP objects.|maxFetchCapacityBytes / 2|
-|fetchTimeout|Timeout for fetching an HTTP object.|60000|
-|maxFetchRetry|Maximum retries for fetching an HTTP object.|3|
-
-
-
-## IngestSegmentFirehose
-
-This Firehose can be used to read the data from existing druid segments, potentially using a new schema and changing the name, dimensions, metrics, rollup, etc. of the segment.
-This Firehose is _splittable_ and can be used by [native parallel index tasks](native-batch.md).
-This firehose will accept any type of parser, but will only utilize the list of dimensions and the timestamp specification.
- A sample ingest Firehose spec is shown below:
-
-```json
-{
- "type": "ingestSegment",
- "dataSource": "wikipedia",
- "interval": "2013-01-01/2013-01-02"
-}
-```
-
-|property|description|required?|
-|--------|-----------|---------|
-|type|This should be "ingestSegment".|yes|
-|dataSource|A String defining the data source to fetch rows from, very similar to a table in a relational database|yes|
-|interval|A String representing the ISO-8601 interval. This defines the time range to fetch the data over.|yes|
-|dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no|
-|metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no|
-|filter| See [Filters](../querying/filters.md)|no|
-|maxInputSegmentBytesPerTask|Deprecated. Use [Segments Split Hint Spec](./native-batch.md#segments-split-hint-spec) instead. When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no|
-
-
-
-## SqlFirehose
-
-This Firehose can be used to ingest events residing in an RDBMS. The database connection information is provided as part of the ingestion spec.
-For each query, the results are fetched locally and indexed.
-If there are multiple queries from which data needs to be indexed, queries are prefetched in the background, up to `maxFetchCapacityBytes` bytes.
-This Firehose is _splittable_ and can be used by [native parallel index tasks](native-batch.md).
-This firehose will accept any type of parser, but will only utilize the list of dimensions and the timestamp specification. See the extension documentation for more detailed ingestion examples.
-
-Requires one of the following extensions:
- * [MySQL Metadata Store](../development/extensions-core/mysql.md).
- * [PostgreSQL Metadata Store](../development/extensions-core/postgresql.md).
-
-
-```json
-{
- "type": "sql",
- "database": {
- "type": "mysql",
- "connectorConfig": {
- "connectURI": "jdbc:mysql://host:port/schema",
- "user": "user",
- "password": "password"
- }
- },
- "sqls": ["SELECT * FROM table1", "SELECT * FROM table2"]
-}
-```
-
-|property|description|default|required?|
-|--------|-----------|-------|---------|
-|type|This should be "sql".||Yes|
-|database|Specifies the database connection details. The database type corresponds to the extension that supplies the `connectorConfig` support. The specified extension must be loaded into Druid:
[mysql-metadata-storage](../development/extensions-core/mysql.md) for `mysql`
[postgresql-metadata-storage](../development/extensions-core/postgresql.md) extension for `postgresql`.
You can selectively allow JDBC properties in `connectURI`. See [JDBC connections security config](../configuration/index.md#jdbc-connections-to-external-databases) for more details.||Yes|
-|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|No|
-|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|No|
-|prefetchTriggerBytes|Threshold to trigger prefetching SQL result objects.|maxFetchCapacityBytes / 2|No|
-|fetchTimeout|Timeout for fetching the result set.|60000|No|
-|foldCase|Toggle case folding of database column names. This may be enabled in cases where the database returns case insensitive column names in query results.|false|No|
-|sqls|List of SQL queries where each SQL query would retrieve the data to be indexed.||Yes|
-
-### Database
-
-|property|description|default|required?|
-|--------|-----------|-------|---------|
-|type|The type of database to query. Valid values are `mysql` and `postgresql`_||Yes|
-|connectorConfig|Specify the database connection properties via `connectURI`, `user` and `password`||Yes|
-
-## InlineFirehose
-
-This Firehose can be used to read the data inlined in its own spec.
-It can be used for demos or for quickly testing out parsing and schema, and works with `string` typed parsers.
-A sample inline Firehose spec is shown below:
-
-```json
-{
- "type": "inline",
- "data": "0,values,formatted\n1,as,CSV"
-}
-```
-
-|property|description|required?|
-|--------|-----------|---------|
-|type|This should be "inline".|yes|
-|data|Inlined data to ingest.|yes|
-
-## CombiningFirehose
-
-This Firehose can be used to combine and merge data from a list of different Firehoses.
-
-```json
-{
- "type": "combining",
- "delegates": [ { firehose1 }, { firehose2 }, ... ]
-}
-```
-
-|property|description|required?|
-|--------|-----------|---------|
-|type|This should be "combining"|yes|
-|delegates|List of Firehoses to combine data from|yes|
\ No newline at end of file
diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md
index 1d37169684e1..b5be94ba4ba7 100644
--- a/docs/operations/metrics.md
+++ b/docs/operations/metrics.md
@@ -428,15 +428,6 @@ These metrics are available only when `druid.zk.service.enabled = true`.
|`zk/connected`|Indicator of connection status. `1` for connected, `0` for disconnected. Emitted once per monitor period.|None|1|
|`zk/reconnect/time`|Amount of time, in milliseconds, that a server was disconnected from ZooKeeper before reconnecting. Emitted on reconnection. Not emitted if connection to ZooKeeper is permanently lost, because in this case, there is no reconnection.|None|Not present|
-### EventReceiverFirehose
-
-The following metric is only available if the `EventReceiverFirehoseMonitor` module is included.
-
-|Metric|Description|Dimensions|Normal value|
-|------|-----------|----------|------------|
-|`ingest/events/buffered`|Number of events queued in the `EventReceiverFirehose` buffer.|`serviceName`, `dataSource`, `taskId`, `taskType`, `bufferCapacity`|Equal to the current number of events in the buffer queue.|
-|`ingest/bytes/received`|Number of bytes received by the `EventReceiverFirehose`.|`serviceName`, `dataSource`, `taskId`, `taskType`|Varies|
-
## Sys [Deprecated]
> SysMonitor is now deprecated and will be removed in future releases.
diff --git a/docs/operations/migrate-from-firehose-ingestion.md b/docs/operations/migrate-from-firehose-ingestion.md
index f470324b7f4e..540685a717f1 100644
--- a/docs/operations/migrate-from-firehose-ingestion.md
+++ b/docs/operations/migrate-from-firehose-ingestion.md
@@ -23,9 +23,7 @@ sidebar_label: "Migrate from firehose"
~ under the License.
-->
-Apache deprecated support for Druid firehoses in version 0.17. Support for firehose ingestion will be removed in version 26.0.
-
-If you're using a firehose for batch ingestion, we strongly recommend that you follow the instructions on this page to transition to using native batch ingestion input sources as soon as possible.
+Apache deprecated support for Druid firehoses in version 0.17. Support for firehose ingestion was removed in version 26.0.
Firehose ingestion doesn't work with newer Druid versions, so you must be using an ingestion spec with a defined input source before you upgrade.
diff --git a/docs/operations/security-overview.md b/docs/operations/security-overview.md
index 279a1327b97d..96389c8a72c7 100644
--- a/docs/operations/security-overview.md
+++ b/docs/operations/security-overview.md
@@ -52,7 +52,7 @@ The following recommendations apply to the network where Druid runs:
The following recommendation applies to Druid's authorization and authentication model:
* Only grant `WRITE` permissions to any `DATASOURCE` to trusted users. Druid's trust model assumes those users have the same privileges as the operating system user that runs the web console process. Additionally, users with `WRITE` permissions can make changes to datasources and they have access to both task and supervisor update (POST) APIs which may affect ingestion.
* Only grant `STATE READ`, `STATE WRITE`, `CONFIG WRITE`, and `DATASOURCE WRITE` permissions to highly-trusted users. These permissions allow users to access resources on behalf of the Druid server process regardless of the datasource.
-* If your Druid client application allows less-trusted users to control the input source or firehose of an ingestion task, validate the URLs from the users. It is possible to point unchecked URLs to other locations and resources within your network or local file system.
+* If your Druid client application allows less-trusted users to control the input source of an ingestion task, validate the URLs from the users. It is possible to point unchecked URLs to other locations and resources within your network or local file system.
## Enable TLS
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java
index 04b332aac850..b3fda99b222e 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java
@@ -79,7 +79,6 @@ public static Task getTask()
null
),
new IndexTask.IndexIOConfig(
- null,
new LocalInputSource(new File("lol"), "rofl"),
new NoopInputFormat(),
true,
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
index 241b4d9fc68f..e2d97accc97f 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.k8s.overlord.taskadapter;
-import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import io.fabric8.kubernetes.api.model.Pod;
@@ -27,7 +26,6 @@
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
-import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
@@ -83,9 +81,6 @@ public void setup()
{
TestUtils utils = new TestUtils();
jsonMapper = utils.getTestObjectMapper();
- for (Module jacksonModule : new FirehoseModule().getJacksonModules()) {
- jsonMapper.registerModule(jacksonModule);
- }
jsonMapper.registerSubtypes(
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
index 102565efc35a..f8e7186a0263 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.k8s.overlord.taskadapter;
-import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Joiner;
@@ -44,7 +43,6 @@
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.error.DruidException;
-import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
@@ -101,9 +99,6 @@ public K8sTaskAdapterTest()
{
TestUtils utils = new TestUtils();
jsonMapper = utils.getTestObjectMapper();
- for (Module jacksonModule : new FirehoseModule().getJacksonModules()) {
- jsonMapper.registerModule(jacksonModule);
- }
jsonMapper.registerSubtypes(
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
index 58993b9a6a00..9308835d967f 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.k8s.overlord.taskadapter;
-import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
@@ -28,7 +27,6 @@
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
-import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
@@ -67,9 +65,6 @@ public void setup()
{
TestUtils utils = new TestUtils();
jsonMapper = utils.getTestObjectMapper();
- for (Module jacksonModule : new FirehoseModule().getJacksonModules()) {
- jsonMapper.registerModule(jacksonModule);
- }
jsonMapper.registerSubtypes(
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
index afc5299927fa..4cf9c3e4cbbc 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
@@ -19,14 +19,12 @@
package org.apache.druid.k8s.overlord.taskadapter;
-import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
-import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
@@ -66,9 +64,6 @@ public void setup()
{
TestUtils utils = new TestUtils();
jsonMapper = utils.getTestObjectMapper();
- for (Module jacksonModule : new FirehoseModule().getJacksonModules()) {
- jsonMapper.registerModule(jacksonModule);
- }
jsonMapper.registerSubtypes(
new NamedType(ParallelIndexTuningConfig.class, "index_parallel"),
new NamedType(IndexTask.IndexTuningConfig.class, "index")
diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
index 7d59cb63ea5b..01039375259e 100644
--- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
+++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
@@ -47,7 +47,7 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
-import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
+import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
index 14bd59871253..65e71f626e8e 100644
--- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
+++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
@@ -40,8 +40,8 @@
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.segment.TestHelper;
-import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
-import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
+import org.apache.druid.segment.realtime.ChatHandlerProvider;
+import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
import org.apache.druid.server.security.AuthorizerMapper;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
index 38f15a840dc2..ff1759195869 100644
--- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
+++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
@@ -52,7 +52,7 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
-import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
+import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
diff --git a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
index 7e625c9edb3d..9cb01646f209 100644
--- a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
+++ b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json
@@ -201,9 +201,6 @@
"zk/connected" : { "dimensions" : [], "type" : "gauge", "help": "Indicator of connection status to zookeeper."},
"zk/reconnect/time" : { "dimensions" : [], "type" : "timer", "conversionFactor": 1000.0, "help": "Amount of time, in seconds, that a server was disconnected from ZooKeeper before reconnecting." },
- "ingest/events/buffered" : { "dimensions" : ["serviceName", "dataSource", "bufferCapacity"], "type" : "gauge", "help": "Number of events queued in the EventReceiverFirehose's buffer"},
- "ingest/bytes/received" : { "dimensions" : ["serviceName", "dataSource"], "type" : "gauge", "help": "Number of bytes received by the EventReceiverFirehose."},
-
"sys/swap/free" : { "dimensions" : [], "type" : "gauge", "help": "Free swap"},
"sys/swap/max" : { "dimensions" : [], "type" : "gauge", "help": "Max swap"},
"sys/swap/pageIn" : { "dimensions" : [], "type" : "gauge", "help": "Paged in swap"},
diff --git a/extensions-core/azure-extensions/pom.xml b/extensions-core/azure-extensions/pom.xml
index 88a1198b61e1..7f0f767af851 100644
--- a/extensions-core/azure-extensions/pom.xml
+++ b/extensions-core/azure-extensions/pom.xml
@@ -204,10 +204,6 @@
org.jacocojacoco-maven-plugin
-
-
- org/apache/druid/firehose/azure/**/*
- BUNDLE
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
index da04e3ab0a6d..60d8f686a28c 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
@@ -33,9 +33,9 @@
import org.apache.druid.initialization.Initialization;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.realtime.ChatHandlerProvider;
+import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
-import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
-import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index e960a8aea29c..0fef9d32e6d4 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -53,7 +53,7 @@
import org.apache.druid.query.QueryContext;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
-import org.apache.druid.segment.realtime.firehose.ChatHandler;
+import org.apache.druid.segment.realtime.ChatHandler;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/ControllerChatHandler.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/ControllerChatHandler.java
index bf3dd4a6bf14..d161b01bd0b6 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/ControllerChatHandler.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/ControllerChatHandler.java
@@ -22,7 +22,7 @@
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.indexing.IndexerResourcePermissionMapper;
import org.apache.druid.msq.rpc.ControllerResource;
-import org.apache.druid.segment.realtime.firehose.ChatHandler;
+import org.apache.druid.segment.realtime.ChatHandler;
import org.apache.druid.server.security.AuthorizerMapper;
public class ControllerChatHandler extends ControllerResource implements ChatHandler
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/WorkerChatHandler.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/WorkerChatHandler.java
index 4ef6ab077cac..70d1ab11d380 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/WorkerChatHandler.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/WorkerChatHandler.java
@@ -33,8 +33,8 @@
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
import org.apache.druid.msq.statistics.serde.ClusterByStatisticsSnapshotSerde;
-import org.apache.druid.segment.realtime.firehose.ChatHandler;
-import org.apache.druid.segment.realtime.firehose.ChatHandlers;
+import org.apache.druid.segment.realtime.ChatHandler;
+import org.apache.druid.segment.realtime.ChatHandlers;
import org.apache.druid.server.security.Action;
import org.apache.druid.utils.CloseableUtils;
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
index ad05c20b5829..14f6f73b24ab 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
@@ -44,7 +44,7 @@
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.NoopRowIngestionMeters;
import org.apache.druid.segment.loading.DataSegmentPusher;
-import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
+import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
diff --git a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/firehose/sql/MySQLFirehoseDatabaseConnector.java b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/input/MySQLInputSourceDatabaseConnector.java
similarity index 91%
rename from extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/firehose/sql/MySQLFirehoseDatabaseConnector.java
rename to extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/input/MySQLInputSourceDatabaseConnector.java
index 07f48f36cd39..4647434aba10 100644
--- a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/firehose/sql/MySQLFirehoseDatabaseConnector.java
+++ b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/input/MySQLInputSourceDatabaseConnector.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.firehose.sql;
+package org.apache.druid.metadata.input;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -25,7 +25,7 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
-import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
+import org.apache.druid.metadata.SQLInputSourceDatabaseConnector;
import org.apache.druid.metadata.storage.mysql.MySQLConnectorDriverConfig;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.apache.druid.utils.ConnectionUriUtils;
@@ -37,7 +37,7 @@
@JsonTypeName("mysql")
-public class MySQLFirehoseDatabaseConnector extends SQLFirehoseDatabaseConnector
+public class MySQLInputSourceDatabaseConnector extends SQLInputSourceDatabaseConnector
{
private final DBI dbi;
private final MetadataStorageConnectorConfig connectorConfig;
@@ -45,7 +45,7 @@ public class MySQLFirehoseDatabaseConnector extends SQLFirehoseDatabaseConnector
private final String driverClassName;
@JsonCreator
- public MySQLFirehoseDatabaseConnector(
+ public MySQLInputSourceDatabaseConnector(
@JsonProperty("connectorConfig") MetadataStorageConnectorConfig connectorConfig,
@JsonProperty("driverClassName") @Nullable String driverClassName,
@JacksonInject JdbcAccessSecurityConfig securityConfig,
@@ -98,7 +98,7 @@ public boolean equals(Object o)
if (o == null || getClass() != o.getClass()) {
return false;
}
- MySQLFirehoseDatabaseConnector that = (MySQLFirehoseDatabaseConnector) o;
+ MySQLInputSourceDatabaseConnector that = (MySQLInputSourceDatabaseConnector) o;
return connectorConfig.equals(that.connectorConfig) && Objects.equals(
driverClassName,
that.driverClassName
diff --git a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLMetadataStorageModule.java b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLMetadataStorageModule.java
index 2cb8cc7dc79a..3f62e1d7eb36 100644
--- a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLMetadataStorageModule.java
+++ b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLMetadataStorageModule.java
@@ -24,7 +24,6 @@
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import com.google.inject.Key;
-import org.apache.druid.firehose.sql.MySQLFirehoseDatabaseConnector;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.PolyBind;
@@ -36,6 +35,7 @@
import org.apache.druid.metadata.MySQLMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.NoopMetadataStorageProvider;
import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.metadata.input.MySQLInputSourceDatabaseConnector;
import java.util.Collections;
import java.util.List;
@@ -55,7 +55,7 @@ public List extends Module> getJacksonModules()
return Collections.singletonList(
new SimpleModule()
.registerSubtypes(
- new NamedType(MySQLFirehoseDatabaseConnector.class, "mysql")
+ new NamedType(MySQLInputSourceDatabaseConnector.class, "mysql")
)
);
}
diff --git a/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/firehose/sql/MySQLFirehoseDatabaseConnectorTest.java b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/input/MySQLInputSourceDatabaseConnectorTest.java
similarity index 92%
rename from extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/firehose/sql/MySQLFirehoseDatabaseConnectorTest.java
rename to extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/input/MySQLInputSourceDatabaseConnectorTest.java
index 43d96961c1ba..1ed7214aa961 100644
--- a/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/firehose/sql/MySQLFirehoseDatabaseConnectorTest.java
+++ b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/input/MySQLInputSourceDatabaseConnectorTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.firehose.sql;
+package org.apache.druid.metadata.input;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.InjectableValues;
@@ -42,7 +42,7 @@
import java.util.Set;
@RunWith(MockitoJUnitRunner.class)
-public class MySQLFirehoseDatabaseConnectorTest
+public class MySQLInputSourceDatabaseConnectorTest
{
private static final JdbcAccessSecurityConfig INJECTED_CONF = newSecurityConfigEnforcingAllowList(ImmutableSet.of());
@@ -76,33 +76,33 @@ public String getConnectURI()
return "jdbc:mysql://localhost:3306/test";
}
};
- MySQLFirehoseDatabaseConnector connector = new MySQLFirehoseDatabaseConnector(
+ MySQLInputSourceDatabaseConnector connector = new MySQLInputSourceDatabaseConnector(
connectorConfig,
null,
INJECTED_CONF,
mySQLConnectorDriverConfig
);
- MySQLFirehoseDatabaseConnector andBack = mapper.readValue(
+ MySQLInputSourceDatabaseConnector andBack = mapper.readValue(
mapper.writeValueAsString(connector),
- MySQLFirehoseDatabaseConnector.class
+ MySQLInputSourceDatabaseConnector.class
);
Assert.assertEquals(connector, andBack);
// test again with classname
- connector = new MySQLFirehoseDatabaseConnector(
+ connector = new MySQLInputSourceDatabaseConnector(
connectorConfig,
"some.class.name.Driver",
INJECTED_CONF,
mySQLConnectorDriverConfig
);
- andBack = mapper.readValue(mapper.writeValueAsString(connector), MySQLFirehoseDatabaseConnector.class);
+ andBack = mapper.readValue(mapper.writeValueAsString(connector), MySQLInputSourceDatabaseConnector.class);
Assert.assertEquals(connector, andBack);
}
@Test
public void testEqualsAndHashcode()
{
- EqualsVerifier.forClass(MySQLFirehoseDatabaseConnector.class)
+ EqualsVerifier.forClass(MySQLInputSourceDatabaseConnector.class)
.usingGetClass()
.withNonnullFields("connectorConfig")
.withIgnoredFields("dbi")
@@ -123,7 +123,7 @@ public String getConnectURI()
JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of());
- new MySQLFirehoseDatabaseConnector(
+ new MySQLInputSourceDatabaseConnector(
connectorConfig,
null,
securityConfig,
@@ -145,7 +145,7 @@ public String getConnectURI()
JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of("user"));
- new MySQLFirehoseDatabaseConnector(
+ new MySQLInputSourceDatabaseConnector(
connectorConfig,
null,
securityConfig,
@@ -170,7 +170,7 @@ public String getConnectURI()
expectedException.expectMessage("The property [password] is not in the allowed list");
expectedException.expect(IllegalArgumentException.class);
- new MySQLFirehoseDatabaseConnector(
+ new MySQLInputSourceDatabaseConnector(
connectorConfig,
null,
securityConfig,
@@ -194,7 +194,7 @@ public String getConnectURI()
ImmutableSet.of("user", "password", "keyonly", "etc")
);
- new MySQLFirehoseDatabaseConnector(
+ new MySQLInputSourceDatabaseConnector(
connectorConfig,
null,
securityConfig,
@@ -218,7 +218,7 @@ public String getConnectURI()
ImmutableSet.of("user", "password", "keyonly", "etc")
);
- new MySQLFirehoseDatabaseConnector(
+ new MySQLInputSourceDatabaseConnector(
connectorConfig,
null,
securityConfig,
@@ -244,7 +244,7 @@ public String getConnectURI()
expectedException.expectMessage("The property [password] is not in the allowed list");
expectedException.expect(IllegalArgumentException.class);
- new MySQLFirehoseDatabaseConnector(
+ new MySQLInputSourceDatabaseConnector(
connectorConfig,
null,
securityConfig,
@@ -269,7 +269,7 @@ public String getConnectURI()
expectedException.expectMessage("The property [password] is not in the allowed list");
expectedException.expect(IllegalArgumentException.class);
- new MySQLFirehoseDatabaseConnector(
+ new MySQLInputSourceDatabaseConnector(
connectorConfig,
null,
securityConfig,
@@ -294,7 +294,7 @@ public String getConnectURI()
expectedException.expectMessage("The property [password] is not in the allowed list");
expectedException.expect(IllegalArgumentException.class);
- new MySQLFirehoseDatabaseConnector(
+ new MySQLInputSourceDatabaseConnector(
connectorConfig,
null,
securityConfig,
@@ -329,7 +329,7 @@ public boolean isEnforceAllowedProperties()
}
};
- new MySQLFirehoseDatabaseConnector(
+ new MySQLInputSourceDatabaseConnector(
connectorConfig,
null,
securityConfig,
diff --git a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnector.java b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/input/PostgresqlInputSourceDatabaseConnector.java
similarity index 87%
rename from extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnector.java
rename to extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/input/PostgresqlInputSourceDatabaseConnector.java
index d9880d7acf2f..9812b213f59f 100644
--- a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnector.java
+++ b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/input/PostgresqlInputSourceDatabaseConnector.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.firehose;
+package org.apache.druid.metadata.input;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -25,7 +25,7 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
-import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
+import org.apache.druid.metadata.SQLInputSourceDatabaseConnector;
import org.apache.druid.server.initialization.JdbcAccessSecurityConfig;
import org.apache.druid.utils.ConnectionUriUtils;
import org.skife.jdbi.v2.DBI;
@@ -35,13 +35,13 @@
@JsonTypeName("postgresql")
-public class PostgresqlFirehoseDatabaseConnector extends SQLFirehoseDatabaseConnector
+public class PostgresqlInputSourceDatabaseConnector extends SQLInputSourceDatabaseConnector
{
private final DBI dbi;
private final MetadataStorageConnectorConfig connectorConfig;
@JsonCreator
- public PostgresqlFirehoseDatabaseConnector(
+ public PostgresqlInputSourceDatabaseConnector(
@JsonProperty("connectorConfig") MetadataStorageConnectorConfig connectorConfig,
@JacksonInject JdbcAccessSecurityConfig securityConfig
)
@@ -80,7 +80,7 @@ public boolean equals(Object o)
if (o == null || getClass() != o.getClass()) {
return false;
}
- PostgresqlFirehoseDatabaseConnector that = (PostgresqlFirehoseDatabaseConnector) o;
+ PostgresqlInputSourceDatabaseConnector that = (PostgresqlInputSourceDatabaseConnector) o;
return connectorConfig.equals(that.connectorConfig);
}
@@ -93,7 +93,7 @@ public int hashCode()
@Override
public String toString()
{
- return "PostgresqlFirehoseDatabaseConnector{" +
+ return "PostgresqlInputSourceDatabaseConnector{" +
"connectorConfig=" + connectorConfig +
'}';
}
diff --git a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLMetadataStorageModule.java b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLMetadataStorageModule.java
index 9506eddd04bf..31777d0f4314 100644
--- a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLMetadataStorageModule.java
+++ b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLMetadataStorageModule.java
@@ -24,7 +24,6 @@
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import com.google.inject.Key;
-import org.apache.druid.firehose.PostgresqlFirehoseDatabaseConnector;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.PolyBind;
@@ -36,6 +35,7 @@
import org.apache.druid.metadata.NoopMetadataStorageProvider;
import org.apache.druid.metadata.PostgreSQLMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.SQLMetadataConnector;
+import org.apache.druid.metadata.input.PostgresqlInputSourceDatabaseConnector;
import java.util.Collections;
import java.util.List;
@@ -56,7 +56,7 @@ public List extends Module> getJacksonModules()
return Collections.singletonList(
new SimpleModule()
.registerSubtypes(
- new NamedType(PostgresqlFirehoseDatabaseConnector.class, "postgresql")
+ new NamedType(PostgresqlInputSourceDatabaseConnector.class, "postgresql")
)
);
}
diff --git a/extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnectorTest.java b/extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/metadata/input/PostgresqlInputSourceDatabaseConnectorTest.java
similarity index 90%
rename from extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnectorTest.java
rename to extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/metadata/input/PostgresqlInputSourceDatabaseConnectorTest.java
index 9b93f0102c45..0d67d72aa848 100644
--- a/extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/firehose/PostgresqlFirehoseDatabaseConnectorTest.java
+++ b/extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/metadata/input/PostgresqlInputSourceDatabaseConnectorTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.firehose;
+package org.apache.druid.metadata.input;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.InjectableValues;
@@ -35,7 +35,7 @@
import java.util.Set;
-public class PostgresqlFirehoseDatabaseConnectorTest
+public class PostgresqlInputSourceDatabaseConnectorTest
{
private static final ObjectMapper MAPPER = new DefaultObjectMapper();
private static final JdbcAccessSecurityConfig INJECTED_CONF = newSecurityConfigEnforcingAllowList(ImmutableSet.of());
@@ -60,13 +60,13 @@ public String getConnectURI()
return "jdbc:postgresql://localhost:3306/test";
}
};
- PostgresqlFirehoseDatabaseConnector connector = new PostgresqlFirehoseDatabaseConnector(
+ PostgresqlInputSourceDatabaseConnector connector = new PostgresqlInputSourceDatabaseConnector(
connectorConfig,
INJECTED_CONF
);
- PostgresqlFirehoseDatabaseConnector andBack = MAPPER.readValue(
+ PostgresqlInputSourceDatabaseConnector andBack = MAPPER.readValue(
MAPPER.writeValueAsString(connector),
- PostgresqlFirehoseDatabaseConnector.class
+ PostgresqlInputSourceDatabaseConnector.class
);
Assert.assertEquals(connector, andBack);
}
@@ -74,7 +74,7 @@ public String getConnectURI()
@Test
public void testEqualsAndHashcode()
{
- EqualsVerifier.forClass(PostgresqlFirehoseDatabaseConnector.class)
+ EqualsVerifier.forClass(PostgresqlInputSourceDatabaseConnector.class)
.usingGetClass()
.withNonnullFields("connectorConfig")
.withIgnoredFields("dbi")
@@ -95,7 +95,7 @@ public String getConnectURI()
JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of());
- new PostgresqlFirehoseDatabaseConnector(
+ new PostgresqlInputSourceDatabaseConnector(
connectorConfig,
securityConfig
);
@@ -115,7 +115,7 @@ public String getConnectURI()
JdbcAccessSecurityConfig securityConfig = newSecurityConfigEnforcingAllowList(ImmutableSet.of("user"));
- new PostgresqlFirehoseDatabaseConnector(
+ new PostgresqlInputSourceDatabaseConnector(
connectorConfig,
securityConfig
);
@@ -138,7 +138,7 @@ public String getConnectURI()
expectedException.expectMessage("is not in the allowed list");
expectedException.expect(IllegalArgumentException.class);
- new PostgresqlFirehoseDatabaseConnector(
+ new PostgresqlInputSourceDatabaseConnector(
connectorConfig,
securityConfig
);
@@ -160,7 +160,7 @@ public String getConnectURI()
ImmutableSet.of("user", "password", "keyonly", "etc")
);
- new PostgresqlFirehoseDatabaseConnector(
+ new PostgresqlInputSourceDatabaseConnector(
connectorConfig,
securityConfig
);
@@ -183,7 +183,7 @@ public String getConnectURI()
expectedException.expectMessage("is not in the allowed list");
expectedException.expect(IllegalArgumentException.class);
- new PostgresqlFirehoseDatabaseConnector(
+ new PostgresqlInputSourceDatabaseConnector(
connectorConfig,
securityConfig
);
@@ -206,7 +206,7 @@ public String getConnectURI()
expectedException.expectMessage("is not in the allowed list");
expectedException.expect(IllegalArgumentException.class);
- new PostgresqlFirehoseDatabaseConnector(
+ new PostgresqlInputSourceDatabaseConnector(
connectorConfig,
securityConfig
);
@@ -239,7 +239,7 @@ public boolean isEnforceAllowedProperties()
}
};
- new PostgresqlFirehoseDatabaseConnector(
+ new PostgresqlInputSourceDatabaseConnector(
connectorConfig,
securityConfig
);
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/hadoop/DatasourceRecordReader.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/hadoop/DatasourceRecordReader.java
index 1e5da8af8ed6..40a67f1236e0 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/hadoop/DatasourceRecordReader.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/hadoop/DatasourceRecordReader.java
@@ -20,36 +20,62 @@
package org.apache.druid.indexer.hadoop;
import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.Row;
+import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.druid.indexer.JobHelper;
+import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.segment.BaseLongColumnValueSelector;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
-import org.apache.druid.segment.realtime.firehose.IngestSegmentFirehose;
-import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.segment.realtime.WindowedStorageAdapter;
+import org.apache.druid.segment.transform.TransformSpec;
+import org.apache.druid.segment.transform.Transformer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import javax.annotation.Nullable;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
public class DatasourceRecordReader extends RecordReader
{
private static final Logger logger = new Logger(DatasourceRecordReader.class);
private DatasourceIngestionSpec spec;
- private IngestSegmentFirehose firehose;
+ private SegmentReader segmentReader;
private long rowNum;
private Row currRow;
@@ -108,7 +134,7 @@ public WindowedStorageAdapter apply(WindowedDataSegment segment)
}
);
- firehose = new IngestSegmentFirehose(
+ segmentReader = new SegmentReader(
adapters,
spec.getTransformSpec(),
spec.getDimensions(),
@@ -120,8 +146,8 @@ public WindowedStorageAdapter apply(WindowedDataSegment segment)
@Override
public boolean nextKeyValue()
{
- if (firehose.hasMore()) {
- currRow = firehose.nextRow();
+ if (segmentReader.hasMore()) {
+ currRow = segmentReader.nextRow();
rowNum++;
return true;
} else {
@@ -154,7 +180,7 @@ public float getProgress()
@Override
public void close() throws IOException
{
- Closeables.close(firehose, true);
+ Closeables.close(segmentReader, true);
for (QueryableIndex qi : indexes) {
Closeables.close(qi, true);
}
@@ -163,4 +189,157 @@ public void close() throws IOException
FileUtils.deleteDirectory(dir);
}
}
+
+ public static class SegmentReader implements Closeable
+ {
+ private final Transformer transformer;
+ private Yielder rowYielder;
+
+ public SegmentReader(
+ final List adapters,
+ final TransformSpec transformSpec,
+ final List dims,
+ final List metrics,
+ final DimFilter dimFilter
+ )
+ {
+ this.transformer = transformSpec.toTransformer();
+
+ Sequence rows = Sequences.concat(
+ Iterables.transform(
+ adapters,
+ new Function>()
+ {
+ @Nullable
+ @Override
+ public Sequence apply(WindowedStorageAdapter adapter)
+ {
+ return Sequences.concat(
+ Sequences.map(
+ adapter.getAdapter().makeCursors(
+ Filters.toFilter(dimFilter),
+ adapter.getInterval(),
+ VirtualColumns.EMPTY,
+ Granularities.ALL,
+ false,
+ null
+ ), new Function>()
+ {
+ @Nullable
+ @Override
+ public Sequence apply(final Cursor cursor)
+ {
+ final BaseLongColumnValueSelector timestampColumnSelector =
+ cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);
+
+ final Map dimSelectors = new HashMap<>();
+ for (String dim : dims) {
+ final DimensionSelector dimSelector = cursor
+ .getColumnSelectorFactory()
+ .makeDimensionSelector(new DefaultDimensionSpec(dim, dim));
+ // dimSelector is null if the dimension is not present
+ if (dimSelector != null) {
+ dimSelectors.put(dim, dimSelector);
+ }
+ }
+
+ final Map metSelectors = new HashMap<>();
+ for (String metric : metrics) {
+ final BaseObjectColumnValueSelector metricSelector =
+ cursor.getColumnSelectorFactory().makeColumnValueSelector(metric);
+ metSelectors.put(metric, metricSelector);
+ }
+
+ return Sequences.simple(
+ new Iterable()
+ {
+ @Override
+ public Iterator iterator()
+ {
+ return new Iterator()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return !cursor.isDone();
+ }
+
+ @Override
+ public InputRow next()
+ {
+ final Map theEvent = Maps.newLinkedHashMap();
+ final long timestamp = timestampColumnSelector.getLong();
+ theEvent.put(TimestampSpec.DEFAULT_COLUMN, DateTimes.utc(timestamp));
+
+ for (Map.Entry dimSelector :
+ dimSelectors.entrySet()) {
+ final String dim = dimSelector.getKey();
+ final DimensionSelector selector = dimSelector.getValue();
+ final IndexedInts vals = selector.getRow();
+
+ int valsSize = vals.size();
+ if (valsSize == 1) {
+ final String dimVal = selector.lookupName(vals.get(0));
+ theEvent.put(dim, dimVal);
+ } else if (valsSize > 1) {
+ List dimVals = new ArrayList<>(valsSize);
+ for (int i = 0; i < valsSize; ++i) {
+ dimVals.add(selector.lookupName(vals.get(i)));
+ }
+ theEvent.put(dim, dimVals);
+ }
+ }
+
+ for (Map.Entry metSelector :
+ metSelectors.entrySet()) {
+ final String metric = metSelector.getKey();
+ final BaseObjectColumnValueSelector selector = metSelector.getValue();
+ Object value = selector.getObject();
+ if (value != null) {
+ theEvent.put(metric, value);
+ }
+ }
+ cursor.advance();
+ return new MapBasedInputRow(timestamp, dims, theEvent);
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException("Remove Not Supported");
+ }
+ };
+ }
+ }
+ );
+ }
+ }
+ )
+ );
+ }
+ }
+ )
+ );
+ rowYielder = Yielders.each(rows);
+ }
+
+ public boolean hasMore()
+ {
+ return !rowYielder.isDone();
+ }
+
+ @Nullable
+ public InputRow nextRow()
+ {
+ final InputRow inputRow = rowYielder.get();
+ rowYielder = rowYielder.next(null);
+ return transformer.transform(inputRow);
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ rowYielder.close();
+ }
+ }
}
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
index ed8b8c0bb093..6516b0a0e00d 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
@@ -26,13 +26,13 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.commons.io.FileUtils;
-import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.hll.HyperLogLogCollector;
+import org.apache.druid.indexer.hadoop.DatasourceRecordReader;
import org.apache.druid.indexer.hadoop.WindowedDataSegment;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
@@ -49,8 +49,7 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
-import org.apache.druid.segment.realtime.firehose.IngestSegmentFirehose;
-import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
+import org.apache.druid.segment.realtime.WindowedStorageAdapter;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
@@ -404,7 +403,7 @@ private void testIngestion(
QueryableIndex index = INDEX_IO.loadIndex(tmpUnzippedSegmentDir);
StorageAdapter adapter = new QueryableIndexStorageAdapter(index);
- Firehose firehose = new IngestSegmentFirehose(
+ DatasourceRecordReader.SegmentReader segmentReader = new DatasourceRecordReader.SegmentReader(
ImmutableList.of(new WindowedStorageAdapter(adapter, windowedDataSegment.getInterval())),
TransformSpec.NONE,
expectedDimensions,
@@ -413,11 +412,12 @@ private void testIngestion(
);
List rows = new ArrayList<>();
- while (firehose.hasMore()) {
- rows.add(firehose.nextRow());
+ while (segmentReader.hasMore()) {
+ rows.add(segmentReader.nextRow());
}
verifyRows(expectedRowsGenerated, rows, expectedDimensions, expectedMetrics);
+ segmentReader.close();
}
private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig(Map inputSpec, File tmpDir)
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/hadoop/DatasourceRecordReaderSegmentReaderTest.java
similarity index 92%
rename from server/src/test/java/org/apache/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
rename to indexing-hadoop/src/test/java/org/apache/druid/indexer/hadoop/DatasourceRecordReaderSegmentReaderTest.java
index 44f24f879a22..042aa5e9550f 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/hadoop/DatasourceRecordReaderSegmentReaderTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.segment.realtime.firehose;
+package org.apache.druid.indexer.hadoop;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -47,6 +47,7 @@
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+import org.apache.druid.segment.realtime.WindowedStorageAdapter;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@@ -66,7 +67,7 @@
/**
*/
@RunWith(Parameterized.class)
-public class IngestSegmentFirehoseTest
+public class DatasourceRecordReaderSegmentReaderTest
{
private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec(
ImmutableList.of(
@@ -107,7 +108,7 @@ public static Collection> constructorFeeder()
private final IndexIO indexIO;
private final IndexMerger indexMerger;
- public IngestSegmentFirehoseTest(SegmentWriteOutMediumFactory segmentWriteOutMediumFactory)
+ public DatasourceRecordReaderSegmentReaderTest(SegmentWriteOutMediumFactory segmentWriteOutMediumFactory)
{
indexIO = TestHelper.getTestIndexIO();
indexMerger = TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory);
@@ -135,7 +136,7 @@ public void testReadFromIndexAndWriteAnotherIndex() throws Exception
) {
final StorageAdapter sa = new QueryableIndexStorageAdapter(qi);
final WindowedStorageAdapter wsa = new WindowedStorageAdapter(sa, sa.getInterval());
- final IngestSegmentFirehose firehose = new IngestSegmentFirehose(
+ final DatasourceRecordReader.SegmentReader segmentReader = new DatasourceRecordReader.SegmentReader(
ImmutableList.of(wsa, wsa),
TransformSpec.NONE,
ImmutableList.of("host", "spatial"),
@@ -144,8 +145,8 @@ public void testReadFromIndexAndWriteAnotherIndex() throws Exception
);
int count = 0;
- while (firehose.hasMore()) {
- final InputRow row = firehose.nextRow();
+ while (segmentReader.hasMore()) {
+ final InputRow row = segmentReader.nextRow();
Assert.assertNotNull(row);
if (count == 0) {
Assert.assertEquals(DateTimes.of("2014-10-22T00Z"), row.getTimestamp());
@@ -168,15 +169,15 @@ public void testReadFromIndexAndWriteAnotherIndex() throws Exception
Assert.assertEquals(ImmutableList.of("visited_sum", "unique_hosts"), queryable.getAvailableMetrics());
// Do a spatial filter
- final IngestSegmentFirehose firehose2 = new IngestSegmentFirehose(
+ final DatasourceRecordReader.SegmentReader segmentReader2 = new DatasourceRecordReader.SegmentReader(
ImmutableList.of(new WindowedStorageAdapter(queryable, Intervals.of("2000/3000"))),
TransformSpec.NONE,
ImmutableList.of("host", "spatial"),
ImmutableList.of("visited_sum", "unique_hosts"),
new SpatialDimFilter("spatial", new RadiusBound(new float[]{1, 0}, 0.1f))
);
- final InputRow row = firehose2.nextRow();
- Assert.assertFalse(firehose2.hasMore());
+ final InputRow row = segmentReader2.nextRow();
+ Assert.assertFalse(segmentReader2.hasMore());
Assert.assertEquals(DateTimes.of("2014-10-22T00Z"), row.getTimestamp());
Assert.assertEquals("host2", row.getRaw("host"));
Assert.assertEquals("1,0", row.getRaw("spatial"));
diff --git a/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceFirehoseModule.java b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceFirehoseModule.java
deleted file mode 100644
index 8e4952210997..000000000000
--- a/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceFirehoseModule.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.guice;
-
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.common.collect.ImmutableList;
-import com.google.inject.Binder;
-import org.apache.druid.initialization.DruidModule;
-import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
-
-import java.util.List;
-
-public class IndexingServiceFirehoseModule implements DruidModule
-{
- @Override
- public List extends Module> getJacksonModules()
- {
- return ImmutableList.of(
- new SimpleModule("IndexingServiceFirehoseModule")
- .registerSubtypes(
- new NamedType(EventReceiverFirehoseFactory.class, "receiver")
- )
- );
- }
-
- @Override
- public void configure(Binder binder)
- {
- }
-}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
index 46de3064f033..9732d72eaf5e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java
@@ -58,9 +58,9 @@
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
-import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
index 669a30dc5de9..7190c38849f2 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
@@ -59,8 +59,8 @@
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
-import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index bc9d5aca2b65..f1f96d8ca34d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -59,8 +59,8 @@
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
-import org.apache.druid.segment.realtime.firehose.ChatHandler;
-import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
+import org.apache.druid.segment.realtime.ChatHandler;
+import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index dc6c07b6b83c..a8f60cca5729 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -21,8 +21,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -34,7 +32,6 @@
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputSource;
@@ -87,6 +84,7 @@
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.realtime.ChatHandler;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
@@ -95,7 +93,6 @@
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
-import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
@@ -312,9 +309,6 @@ public String getTaskAllocatorId()
@Override
public Set getInputSourceResources()
{
- if (ingestionSchema.getIOConfig().firehoseFactory != null) {
- throw getInputSecurityOnFirehoseUnsupportedError();
- }
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
@@ -1123,8 +1117,6 @@ public IndexTuningConfig getTuningConfig()
@JsonTypeName("index")
public static class IndexIOConfig implements BatchIOConfig
{
-
- private final FirehoseFactory firehoseFactory;
private final InputSource inputSource;
private final AtomicReference inputSourceWithToolbox = new AtomicReference<>();
private final InputFormat inputFormat;
@@ -1133,42 +1125,18 @@ public static class IndexIOConfig implements BatchIOConfig
@JsonCreator
public IndexIOConfig(
- @Deprecated @JsonProperty("firehose") @Nullable FirehoseFactory firehoseFactory,
@JsonProperty("inputSource") @Nullable InputSource inputSource,
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("appendToExisting") @Nullable Boolean appendToExisting,
@JsonProperty("dropExisting") @Nullable Boolean dropExisting
)
{
- Checks.checkOneNotNullOrEmpty(
- ImmutableList.of(new Property<>("firehose", firehoseFactory), new Property<>("inputSource", inputSource))
- );
- if (firehoseFactory != null && inputFormat != null) {
- throw new IAE("Cannot use firehose and inputFormat together. Try using inputSource instead of firehose.");
- }
- this.firehoseFactory = firehoseFactory;
this.inputSource = inputSource;
this.inputFormat = inputFormat;
this.appendToExisting = appendToExisting == null ? BatchIOConfig.DEFAULT_APPEND_EXISTING : appendToExisting;
this.dropExisting = dropExisting == null ? BatchIOConfig.DEFAULT_DROP_EXISTING : dropExisting;
}
- // old constructor for backward compatibility
- @Deprecated
- public IndexIOConfig(FirehoseFactory firehoseFactory, @Nullable Boolean appendToExisting, @Nullable Boolean dropExisting)
- {
- this(firehoseFactory, null, null, appendToExisting, dropExisting);
- }
-
- @Nullable
- @JsonProperty("firehose")
- @JsonInclude(Include.NON_NULL)
- @Deprecated
- public FirehoseFactory getFirehoseFactory()
- {
- return firehoseFactory;
- }
-
@Nullable
@Override
@JsonProperty
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
index f2eacb8c1c6c..30761b674e54 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
@@ -157,7 +157,6 @@ private static ParallelIndexIOConfig createIoConfig(
}
return new ParallelIndexIOConfig(
- null,
new DruidInputSource(
dataSchema.getDataSource(),
interval,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index 4fb4bb7852c0..9b882e2e8d2b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -152,8 +152,7 @@ default int getPriority()
* the task does not use any. Users can be given permission to access particular types of
* input sources but not others, using the
* {@link org.apache.druid.server.security.AuthConfig#enableInputSourceSecurity} config.
- * @throws UnsupportedOperationException if the given task type does not suppoert input source based security. Such
- * would be the case, if the task uses firehose.
+ * @throws UnsupportedOperationException if the given task type does not suppoert input source based security
*/
@JsonIgnore
@Nonnull
@@ -165,15 +164,6 @@ default Set getInputSourceResources() throws UOE
));
}
- default UOE getInputSecurityOnFirehoseUnsupportedError()
- {
- throw new UOE(StringUtils.format(
- "Input source based security cannot be performed '%s' task because it uses firehose."
- + " Change the tasks configuration, or disable `isEnableInputSourceSecurity`",
- getType()
- ));
- }
-
/**
* Returns query runners for this task. If this task is not meant to answer queries over its datasource, this method
* should return null.
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/InputSourceSplitParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/InputSourceSplitParallelIndexTaskRunner.java
index 286325186b25..e6097678b5ae 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/InputSourceSplitParallelIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/InputSourceSplitParallelIndexTaskRunner.java
@@ -19,7 +19,6 @@
package org.apache.druid.indexing.common.task.batch.parallel;
-import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.SplittableInputSource;
@@ -81,14 +80,11 @@ final int estimateTotalNumSubTasks() throws IOException
final SubTaskSpec newTaskSpec(InputSplit split)
{
- final FirehoseFactory firehoseFactory;
final InputSource inputSource;
- firehoseFactory = null;
inputSource = baseInputSource.withSplit(split);
final ParallelIndexIngestionSpec subTaskIngestionSpec = new ParallelIndexIngestionSpec(
ingestionSchema.getDataSchema(),
new ParallelIndexIOConfig(
- firehoseFactory,
inputSource,
ingestionSchema.getIOConfig().getInputFormat(),
ingestionSchema.getIOConfig().isAppendToExisting(),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java
index 1daa99994274..27a242885c36 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java
@@ -71,9 +71,6 @@ public String getType()
@Override
public Set getInputSourceResources()
{
- if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
- throw getInputSecurityOnFirehoseUnsupportedError();
- }
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java
index c86446d91303..ff10e556de8f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java
@@ -22,7 +22,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
@@ -37,26 +36,12 @@ public class ParallelIndexIOConfig extends IndexIOConfig
{
@JsonCreator
public ParallelIndexIOConfig(
- @JsonProperty("firehose") @Nullable FirehoseFactory firehoseFactory,
@JsonProperty("inputSource") @Nullable InputSource inputSource,
@JsonProperty("inputFormat") @Nullable InputFormat inputFormat,
@JsonProperty("appendToExisting") @Nullable Boolean appendToExisting,
@JsonProperty("dropExisting") @Nullable Boolean dropExisting
)
{
- super(firehoseFactory, inputSource, inputFormat, appendToExisting, dropExisting);
- }
-
- // old constructor for backward compatibility
- @Deprecated
- public ParallelIndexIOConfig(FirehoseFactory firehoseFactory, @Nullable Boolean appendToExisting)
- {
- this(firehoseFactory, null, null, appendToExisting, null);
- }
-
- @Deprecated
- public ParallelIndexIOConfig(FirehoseFactory firehoseFactory, @Nullable Boolean appendToExisting, boolean dropExisting)
- {
- this(firehoseFactory, null, null, appendToExisting, dropExisting);
+ super(inputSource, inputFormat, appendToExisting, dropExisting);
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 4ca0f1ff80d7..6039a62bc984 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -75,10 +75,10 @@
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.segment.realtime.ChatHandler;
+import org.apache.druid.segment.realtime.ChatHandlers;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
-import org.apache.druid.segment.realtime.firehose.ChatHandler;
-import org.apache.druid.segment.realtime.firehose.ChatHandlers;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
@@ -290,9 +290,6 @@ public String getType()
@Override
public Set getInputSourceResources()
{
- if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
- throw getInputSecurityOnFirehoseUnsupportedError();
- }
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
@@ -556,7 +553,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
} else {
if (!baseInputSource.isSplittable()) {
LOG.warn(
- "firehoseFactory[%s] is not splittable. Running sequentially.",
+ "inputSource[%s] is not splittable. Running sequentially.",
baseInputSource.getClass().getSimpleName()
);
} else if (ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() <= 1) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
index 6724a2ebb90c..d75304f38c08 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTask.java
@@ -139,9 +139,6 @@ public String getType()
@Override
public Set getInputSourceResources()
{
- if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
- throw getInputSecurityOnFirehoseUnsupportedError();
- }
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
index cb361a204ea3..4bb395420b3f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
@@ -180,9 +180,6 @@ public String getType()
@Override
public Set getInputSourceResources()
{
- if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
- throw getInputSecurityOnFirehoseUnsupportedError();
- }
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
index 383fc7afb2e0..4c224e396d18 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
@@ -135,9 +135,6 @@ public String getType()
@Override
public Set getInputSourceResources()
{
- if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
- throw getInputSecurityOnFirehoseUnsupportedError();
- }
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
index cf7ae15a9a59..933df9ee7785 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
@@ -153,9 +153,6 @@ public String getType()
@Override
public Set getInputSourceResources()
{
- if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
- throw getInputSecurityOnFirehoseUnsupportedError();
- }
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
index c53557c66558..620d593467ad 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java
@@ -37,8 +37,8 @@
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder;
-import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.indexing.input.DruidInputSource;
+import org.apache.druid.indexing.input.WindowedSegmentId;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.segment.SegmentSchemaMapping;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
index ce6aee98af35..a2a29b3cdd3e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
@@ -21,7 +21,6 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.SplittableInputSource;
@@ -174,9 +173,7 @@ int estimateTotalNumSubTasks() throws IOException
@VisibleForTesting
SubTaskSpec newTaskSpec(InputSplit split)
{
- final FirehoseFactory firehoseFactory;
final InputSource inputSource;
- firehoseFactory = null;
inputSource = baseInputSource.withSplit(split);
final Map subtaskContext = new HashMap<>(getContext());
@@ -187,7 +184,6 @@ SubTaskSpec newTaskSpec(InputSplit split)
new ParallelIndexIngestionSpec(
ingestionSchema.getDataSchema(),
new ParallelIndexIOConfig(
- firehoseFactory,
inputSource,
ingestionSchema.getIOConfig().getInputFormat(),
ingestionSchema.getIOConfig().isAppendToExisting(),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index cd05e50ff1c8..8d49a7f3dbe7 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -60,13 +60,13 @@
import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.segment.realtime.ChatHandler;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
-import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
@@ -198,9 +198,6 @@ public String getType()
@Override
public Set getInputSourceResources()
{
- if (getIngestionSchema().getIOConfig().getFirehoseFactory() != null) {
- throw getInputSecurityOnFirehoseUnsupportedError();
- }
return getIngestionSchema().getIOConfig().getInputSource() != null ?
getIngestionSchema().getIOConfig().getInputSource().getTypes()
.stream()
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilder.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilder.java
index 89afdbb64d05..3e3d653e3c32 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilder.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilder.java
@@ -19,7 +19,6 @@
package org.apache.druid.indexing.common.task.batch.parallel.iterator;
-import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.HandlingInputRowIterator;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
@@ -34,7 +33,6 @@ public interface IndexTaskInputRowIteratorBuilder
/**
* @param granularitySpec {@link GranularitySpec} for the {@link org.apache.druid.segment.indexing.DataSchema}
- * associated with the {@link Firehose}.
*/
IndexTaskInputRowIteratorBuilder granularitySpec(GranularitySpec granularitySpec);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
index dd1998645b34..84c17f1a8fa0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
@@ -51,7 +51,6 @@
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
@@ -133,7 +132,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
private final String dataSource;
// Exactly one of interval and segmentIds should be non-null. Typically 'interval' is specified directly
- // by the user creating this firehose and 'segmentIds' is used for sub-tasks if it is split for parallel
+ // by the user creating this input source and 'segmentIds' is used for sub-tasks if it is split for parallel
// batch ingestion.
@Nullable
private final Interval interval;
@@ -164,7 +163,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
public DruidInputSource(
@JsonProperty("dataSource") final String dataSource,
@JsonProperty("interval") @Nullable Interval interval,
- // Specifying "segments" is intended only for when this FirehoseFactory has split itself,
+ // Specifying "segments" is intended only for when this input source has split itself,
// not for direct end user use.
@JsonProperty("segments") @Nullable List segmentIds,
@JsonProperty("filter") DimFilter dimFilter,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
index d048cf4d8b91..42618556d003 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
@@ -56,7 +56,7 @@
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.filter.Filters;
-import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
+import org.apache.druid.segment.realtime.WindowedStorageAdapter;
import org.apache.druid.utils.CloseableUtils;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Interval;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegmentId.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/WindowedSegmentId.java
similarity index 98%
rename from indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegmentId.java
rename to indexing-service/src/main/java/org/apache/druid/indexing/input/WindowedSegmentId.java
index b55510a7e219..8232aba04460 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegmentId.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/WindowedSegmentId.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.indexing.firehose;
+package org.apache.druid.indexing.input;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
index a84cb4e1979c..5018795cbb4f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
@@ -76,8 +76,8 @@ public class InputSourceSampler
null
);
- // We want to be able to sort the list of processed results back into the same order that we read them from the
- // firehose so that the rows in the data loader are not always changing. To do this, we add a temporary column to the
+ // We want to be able to sort the list of processed results back into the same order that we read them from the input
+ // source so that the rows in the data loader are not always changing. To do this, we add a temporary column to the
// InputRow (in SamplerInputRow) and tag each row with a sortKey. We use an aggregator so that it will not affect
// rollup, and we use a longMin aggregator so that as rows get rolled up, the earlier rows stay stable and later
// rows may get rolled into these rows. After getting the results back from the IncrementalIndex, we sort by this
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index ee4eed572dff..41cd084cd960 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -49,10 +49,10 @@
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.realtime.ChatHandler;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
-import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index fad7b923ca97..d347fd815038 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -81,13 +81,13 @@
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMeters;
+import org.apache.druid.segment.realtime.ChatHandler;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderator;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
-import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
index e1ac9482436b..a4f297c8e335 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
@@ -51,9 +51,9 @@
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLocalCacheManager;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
+import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
-import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestFirehose.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestFirehose.java
deleted file mode 100644
index 1572373151bf..000000000000
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestFirehose.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common;
-
-import org.apache.druid.data.input.Firehose;
-import org.apache.druid.data.input.FirehoseFactory;
-import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.impl.InputRowParser;
-import org.apache.druid.data.input.impl.StringInputRowParser;
-import org.apache.druid.java.util.common.parsers.ParseException;
-
-import java.io.File;
-import java.util.ArrayDeque;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.List;
-import java.util.Optional;
-
-public class TestFirehose implements Firehose
-{
- public static class TestFirehoseFactory implements FirehoseFactory
- {
- private boolean waitForClose = true;
-
- @Override
- @SuppressWarnings("unchecked")
- public Firehose connect(InputRowParser parser, File temporaryDirectory) throws ParseException
- {
- return new TestFirehose(parser, waitForClose, Collections.emptyList());
- }
- }
-
- public static final String FAIL_DIM = "__fail__";
-
- private final Deque> queue = new ArrayDeque<>();
-
- private InputRowParser parser;
- private boolean closed;
-
- private TestFirehose(InputRowParser parser, boolean waitForClose, List
- */
-@Deprecated
-@ExtensionPoint
-public interface Firehose extends Closeable
-{
- /**
- * Returns whether there are more rows to process. This is used to indicate that another item is immediately
- * available via {@link #nextRow()}. Thus, if the stream is still available but there are no new messages on it, this
- * call should block until a new message is available.
- *
- * If something happens such that the stream is no longer available, this should return false.
- *
- * @return true if and when there is another row available, false if the stream has dried up
- */
- boolean hasMore() throws IOException;
-
- /**
- * The next row available. Should only be called if hasMore returns true.
- * The return value can be null which means the caller must skip this row.
- *
- * @return The next row
- */
- @Nullable
- InputRow nextRow() throws IOException;
-
- /**
- * Returns an {@link InputRowListPlusRawValues} object containing the InputRow plus the raw, unparsed data corresponding to
- * the next row available. Used in the sampler to provide the caller with information to assist in configuring a parse
- * spec. If a ParseException is thrown by the parser, it should be caught and returned in the InputRowListPlusRawValues so
- * we will be able to provide information on the raw row which failed to be parsed. Should only be called if hasMore
- * returns true.
- *
- * @return an InputRowListPlusRawValues which may contain any of: an InputRow, map of the raw data, or a ParseException
- */
- @Deprecated
- default InputRowListPlusRawValues nextRowWithRaw() throws IOException
- {
- try {
- return InputRowListPlusRawValues.of(nextRow(), null);
- }
- catch (ParseException e) {
- return InputRowListPlusRawValues.of(null, e);
- }
- }
-
- /**
- * Closes the "ingestion side" of the Firehose, potentially concurrently with calls to {@link #hasMore()} and {@link
- * #nextRow()} being made from a different thread. {@link #hasMore()} and {@link #nextRow()}
- * continue to work after close(), but since the ingestion side is closed rows will eventually run out.
- */
- @Override
- void close() throws IOException;
-}
diff --git a/processing/src/main/java/org/apache/druid/data/input/FirehoseFactory.java b/processing/src/main/java/org/apache/druid/data/input/FirehoseFactory.java
deleted file mode 100644
index 6ec8657d435a..000000000000
--- a/processing/src/main/java/org/apache/druid/data/input/FirehoseFactory.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.data.input;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.druid.data.input.impl.InputRowParser;
-import org.apache.druid.guice.annotations.ExtensionPoint;
-import org.apache.druid.java.util.common.parsers.ParseException;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-
-/**
- * FirehoseFactory creates a {@link Firehose} which is an interface holding onto the stream of incoming data.
- * It currently provides two methods for creating a {@link Firehose} and their default implementations call each other
- * for the backward compatibility. Implementations of this interface must implement one of these methods.
- *
- * This class is deprecated in favor of {@link InputSource}
- */
-@Deprecated
-@ExtensionPoint
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
-public interface FirehoseFactory
-{
- /**
- * Initialization method that connects up the fire hose. If this method returns successfully it should be safe to
- * call hasMore() on the returned Firehose (which might subsequently block).
- *
- * If this method returns null, then any attempt to call hasMore(), nextRow() and close() on the return
- * value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on
- * invalid configuration is preferred over returning null.
- *
- * @param parser an input row parser
- */
- @Deprecated
- default Firehose connect(T parser) throws IOException, ParseException
- {
- return connect(parser, null);
- }
-
- /**
- * Initialization method that connects up the fire hose. If this method returns successfully it should be safe to
- * call hasMore() on the returned Firehose (which might subsequently block).
- *
- * If this method returns null, then any attempt to call hasMore(), nextRow() and close() on the return
- * value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on
- * invalid configuration is preferred over returning null.
- *
- * @param parser an input row parser
- * @param temporaryDirectory a directory where temporary files are stored
- */
- default Firehose connect(T parser, @Nullable File temporaryDirectory) throws IOException, ParseException
- {
- return connect(parser);
- }
-
- @SuppressWarnings("unused")
- @JsonIgnore
- default boolean isSplittable()
- {
- return false;
- }
-}
diff --git a/processing/src/main/java/org/apache/druid/data/input/SegmentsSplitHintSpec.java b/processing/src/main/java/org/apache/druid/data/input/SegmentsSplitHintSpec.java
index ed59d67285a6..433a6a2f9958 100644
--- a/processing/src/main/java/org/apache/druid/data/input/SegmentsSplitHintSpec.java
+++ b/processing/src/main/java/org/apache/druid/data/input/SegmentsSplitHintSpec.java
@@ -30,11 +30,10 @@
import java.util.function.Function;
/**
- * {@link SplitHintSpec} for IngestSegmentFirehoseFactory and DruidInputSource.
+ * {@link SplitHintSpec} for DruidInputSource.
*
* In DruidInputSource, this spec is converted into {@link MaxSizeSplitHintSpec}. As a result, its {@link #split}
- * method is never called (IngestSegmentFirehoseFactory creates splits on its own instead of calling the
- * {@code split()} method). This doesn't necessarily mean this class is deprecated in favor of the MaxSizeSplitHintSpec.
+ * method is never called. This doesn't necessarily mean this class is deprecated in favor of the MaxSizeSplitHintSpec.
* We may want to create more optimized splits in the future. For example, segments can be split to maximize the rollup
* ratio if the segments have different sets of columns or even different value ranges of columns.
*/
diff --git a/processing/src/main/java/org/apache/druid/segment/Metadata.java b/processing/src/main/java/org/apache/druid/segment/Metadata.java
index e6b5b1f65883..c3ac554a0faf 100644
--- a/processing/src/main/java/org/apache/druid/segment/Metadata.java
+++ b/processing/src/main/java/org/apache/druid/segment/Metadata.java
@@ -40,7 +40,7 @@
public class Metadata
{
// container is used for arbitrary key-value pairs in segment metadata e.g.
- // kafka firehose uses it to store commit offset
+ // kafka input reader uses it to store commit offset
private final Map container;
@Nullable
private final AggregatorFactory[] aggregators;
diff --git a/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java b/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
index 744c29dba2a1..80602d0508ad 100644
--- a/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
+++ b/processing/src/test/java/org/apache/druid/data/input/impl/InputEntityIteratingReaderTest.java
@@ -187,7 +187,7 @@ public void testSampleWithSystemFields() throws IOException
@Test
public void testIncorrectURI() throws IOException, URISyntaxException
{
- final InputEntityIteratingReader firehose = new InputEntityIteratingReader(
+ final InputEntityIteratingReader inputReader = new InputEntityIteratingReader(
new InputRowSchema(
new TimestampSpec(null, null, null),
new DimensionsSpec(
@@ -220,7 +220,7 @@ protected int getMaxRetries()
temporaryFolder.newFolder()
);
- try (CloseableIterator readIterator = firehose.read()) {
+ try (CloseableIterator readIterator = inputReader.read()) {
String expectedMessage = "Error occurred while trying to read uri: testscheme://some/path";
Exception exception = Assert.assertThrows(RuntimeException.class, readIterator::hasNext);
Assert.assertTrue(exception.getMessage().contains(expectedMessage));
diff --git a/server/src/main/java/org/apache/druid/guice/FirehoseModule.java b/server/src/main/java/org/apache/druid/guice/FirehoseModule.java
deleted file mode 100644
index fe6461bf2158..000000000000
--- a/server/src/main/java/org/apache/druid/guice/FirehoseModule.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.guice;
-
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.inject.Binder;
-import org.apache.druid.initialization.DruidModule;
-import org.apache.druid.segment.realtime.firehose.ClippedFirehoseFactory;
-import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
-import org.apache.druid.segment.realtime.firehose.FixedCountFirehoseFactory;
-import org.apache.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
-
-import java.util.Collections;
-import java.util.List;
-
-public class FirehoseModule implements DruidModule
-{
- @Override
- public void configure(Binder binder)
- {
- }
-
- @Override
- public List extends Module> getJacksonModules()
- {
- return Collections.singletonList(
- new SimpleModule("FirehoseModule")
- .registerSubtypes(
- new NamedType(ClippedFirehoseFactory.class, "clipped"),
- new NamedType(TimedShutoffFirehoseFactory.class, "timed"),
- new NamedType(EventReceiverFirehoseFactory.class, "receiver"),
- new NamedType(FixedCountFirehoseFactory.class, "fixedCount")
- )
- );
- }
-}
diff --git a/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java b/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java
index 59516f8d4c49..e07ac5ed1155 100644
--- a/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java
+++ b/server/src/main/java/org/apache/druid/initialization/CoreInjectorBuilder.java
@@ -29,7 +29,6 @@
import org.apache.druid.guice.DruidSecondaryModule;
import org.apache.druid.guice.ExpressionModule;
import org.apache.druid.guice.ExtensionsModule;
-import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.guice.JacksonConfigManagerModule;
import org.apache.druid.guice.JavaScriptModule;
import org.apache.druid.guice.LifecycleModule;
@@ -123,7 +122,6 @@ public CoreInjectorBuilder forServer()
new CoordinatorDiscoveryModule(),
new LocalDataStorageDruidModule(),
new TombstoneDataStorageModule(),
- new FirehoseModule(),
new JavaScriptModule(),
new AuthenticatorModule(),
new AuthenticatorMapperModule(),
diff --git a/server/src/main/java/org/apache/druid/metadata/BasicDataSourceExt.java b/server/src/main/java/org/apache/druid/metadata/BasicDataSourceExt.java
index 6ddfd378fb43..cff9308a6971 100644
--- a/server/src/main/java/org/apache/druid/metadata/BasicDataSourceExt.java
+++ b/server/src/main/java/org/apache/druid/metadata/BasicDataSourceExt.java
@@ -50,7 +50,7 @@ public class BasicDataSourceExt extends BasicDataSource
* Note that these properties are not currently checked against any security configuration such as
* an allow list for JDBC properties. Instead, they are supposed to be checked before adding to this class.
*
- * @see SQLFirehoseDatabaseConnector#validateConfigs
+ * @see SQLInputSourceDatabaseConnector#validateConfigs
*/
private Properties connectionProperties;
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLFirehoseDatabaseConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLInputSourceDatabaseConnector.java
similarity index 98%
rename from server/src/main/java/org/apache/druid/metadata/SQLFirehoseDatabaseConnector.java
rename to server/src/main/java/org/apache/druid/metadata/SQLInputSourceDatabaseConnector.java
index 11d467323f43..02c2d4269c2c 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLFirehoseDatabaseConnector.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLInputSourceDatabaseConnector.java
@@ -38,7 +38,7 @@
import java.util.Set;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
-public abstract class SQLFirehoseDatabaseConnector
+public abstract class SQLInputSourceDatabaseConnector
{
static final int MAX_RETRIES = 10;
diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java b/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java
index abc64baae5a6..0d55e0ed7b59 100644
--- a/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java
+++ b/server/src/main/java/org/apache/druid/metadata/input/SqlEntity.java
@@ -27,7 +27,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.metadata.SQLFirehoseDatabaseConnector;
+import org.apache.druid.metadata.SQLInputSourceDatabaseConnector;
import org.apache.druid.metadata.SQLMetadataStorageActionHandler;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.exceptions.ResultSetException;
@@ -52,19 +52,19 @@ public class SqlEntity implements InputEntity
private final String sql;
private final ObjectMapper objectMapper;
- private final SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector;
+ private final SQLInputSourceDatabaseConnector sqlInputSourceDatabaseConnector;
private final boolean foldCase;
public SqlEntity(
String sql,
- SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector,
+ SQLInputSourceDatabaseConnector sqlInputSourceDatabaseConnector,
boolean foldCase,
ObjectMapper objectMapper
)
{
this.sql = sql;
- this.sqlFirehoseDatabaseConnector = Preconditions.checkNotNull(
- sqlFirehoseDatabaseConnector,
+ this.sqlInputSourceDatabaseConnector = Preconditions.checkNotNull(
+ sqlInputSourceDatabaseConnector,
"SQL Metadata Connector not configured!"
);
this.foldCase = foldCase;
@@ -93,7 +93,7 @@ public InputStream open()
public CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) throws IOException
{
final File tempFile = File.createTempFile("druid-sql-entity", ".tmp", temporaryDirectory);
- return openCleanableFile(sql, sqlFirehoseDatabaseConnector, objectMapper, foldCase, tempFile);
+ return openCleanableFile(sql, sqlInputSourceDatabaseConnector, objectMapper, foldCase, tempFile);
}
@@ -102,7 +102,7 @@ public CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) throws I
* The result file is deleted if the query execution or the file write fails.
*
* @param sql The SQL query to be executed
- * @param sqlFirehoseDatabaseConnector The database connector
+ * @param sqlInputSourceDatabaseConnector The database connector
* @param objectMapper An object mapper, used for deserialization
* @param foldCase A boolean flag used to enable or disabling case sensitivity while handling database column names
*
@@ -111,7 +111,7 @@ public CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) throws I
public static CleanableFile openCleanableFile(
String sql,
- SQLFirehoseDatabaseConnector sqlFirehoseDatabaseConnector,
+ SQLInputSourceDatabaseConnector sqlInputSourceDatabaseConnector,
ObjectMapper objectMapper,
boolean foldCase,
File tempFile
@@ -124,7 +124,7 @@ public static CleanableFile openCleanableFile(
// Execute the sql query and lazily retrieve the results into the file in json format.
// foldCase is useful to handle differences in case sensitivity behavior across databases.
- sqlFirehoseDatabaseConnector.retryWithHandle(
+ sqlInputSourceDatabaseConnector.retryWithHandle(
(handle) -> {
ResultIterator