Skip to content

Commit

Permalink
Introduce Segment Schema Publishing and Polling for Efficient Datasou…
Browse files Browse the repository at this point in the history
…rce Schema Building (#15817)

Issue: #14989

The initial step in optimizing segment metadata was to centralize the construction of datasource schema in the Coordinator (#14985). Thereafter, we addressed the problem of publishing schema for realtime segments (#15475). Subsequently, our goal is to eliminate the requirement for regularly executing queries to obtain segment schema information.

This is the final change which involves publishing segment schema for finalized segments from task and periodically polling them in the Coordinator.
  • Loading branch information
findingrish authored Apr 24, 2024
1 parent 080476f commit e30790e
Show file tree
Hide file tree
Showing 210 changed files with 9,306 additions and 1,889 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/standard-its.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
strategy:
fail-fast: false
matrix:
testing_group: [batch-index, input-format, input-source, perfect-rollup-parallel-batch-index, kafka-index, kafka-index-slow, kafka-transactional-index, kafka-transactional-index-slow, kafka-data-format, ldap-security, realtime-index, append-ingestion, compaction]
testing_group: [batch-index, input-format, input-source, perfect-rollup-parallel-batch-index, kafka-index, kafka-index-slow, kafka-transactional-index, kafka-transactional-index-slow, kafka-data-format, ldap-security, realtime-index, append-ingestion, compaction, cds-task-schema-publish-disabled, cds-coordinator-smq-disabled]
uses: ./.github/workflows/reusable-standard-its.yml
if: ${{ needs.changes.outputs.core == 'true' || needs.changes.outputs.common-extensions == 'true' }}
with:
Expand Down Expand Up @@ -196,6 +196,6 @@ jobs:
with:
build_jdk: 8
runtime_jdk: 8
testing_groups: -DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade,shuffle-deep-store,custom-coordinator-duties,centralized-datasource-schema
testing_groups: -DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability,upgrade,shuffle-deep-store,custom-coordinator-duties,centralized-datasource-schema,cds-task-schema-publish-disabled,cds-coordinator-smq-disabled
use_indexer: ${{ matrix.indexer }}
group: other
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.DruidServerMetadata;
Expand Down Expand Up @@ -91,7 +92,8 @@ public SegmentMetadataCacheForBenchmark(
brokerInternalQueryConfig,
new NoopServiceEmitter(),
new PhysicalDatasourceMetadataFactory(joinableFactory, segmentManager),
new NoopCoordinatorClient()
new NoopCoordinatorClient(),
CentralizedDatasourceSchemaConfig.create()
);
}

Expand Down
3 changes: 2 additions & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti
|`druid.coordinator.loadqueuepeon.repeatDelay`|The start and repeat delay for the `loadqueuepeon`, which manages the load and drop of segments.|`PT0.050S` (50 ms)|
|`druid.coordinator.asOverlord.enabled`|Boolean value for whether this Coordinator service should act like an Overlord as well. This configuration allows users to simplify a Druid cluster by not having to deploy any standalone Overlord services. If set to true, then Overlord console is available at `http://coordinator-host:port/console.html` and be sure to set `druid.coordinator.asOverlord.overlordService` also.|false|
|`druid.coordinator.asOverlord.overlordService`| Required, if `druid.coordinator.asOverlord.enabled` is `true`. This must be same value as `druid.service` on standalone Overlord services and `druid.selectors.indexing.serviceName` on Middle Managers.|NULL|
|`druid.centralizedDatasourceSchema.enabled`|Boolean flag for enabling datasource schema building on the Coordinator.|false|
|`druid.centralizedDatasourceSchema.enabled`|Boolean flag for enabling datasource schema building on the Coordinator. Note, when using MiddleManager to launch task, set `druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled` in MiddleManager runtime config. |false|

##### Metadata management

Expand Down Expand Up @@ -1435,6 +1435,7 @@ MiddleManagers pass their configurations down to their child peons. The MiddleMa
|`druid.worker.baseTaskDirs`|List of base temporary working directories, one of which is assigned per task in a round-robin fashion. This property can be used to allow usage of multiple disks for indexing. This property is recommended in place of and takes precedence over `${druid.indexer.task.baseTaskDir}`. If this configuration is not set, `${druid.indexer.task.baseTaskDir}` is used. For example, `druid.worker.baseTaskDirs=[\"PATH1\",\"PATH2\",...]`.|null|
|`druid.worker.baseTaskDirSize`|The total amount of bytes that can be used by tasks on any single task dir. This value is treated symmetrically across all directories, that is, if this is 500 GB and there are 3 `baseTaskDirs`, then each of those task directories is assumed to allow for 500 GB to be used and a total of 1.5 TB will potentially be available across all tasks. The actual amount of memory assigned to each task is discussed in [Configuring task storage sizes](../ingestion/tasks.md#configuring-task-storage-sizes)|`Long.MAX_VALUE`|
|`druid.worker.category`|A string to name the category that the MiddleManager node belongs to.|`_default_worker_category`|
|`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled`| This config should be set when CentralizedDatasourceSchema feature is enabled. |false|

#### Peon processing

Expand Down
6 changes: 6 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`metadatacache/schemaPoll/count`|Number of coordinator polls to fetch datasource schema.||
|`metadatacache/schemaPoll/failed`|Number of failed coordinator polls to fetch datasource schema.||
|`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch datasource schema.||
|`metadatacache/backfill/count`|Number of segments for which schema was back filled in the database.|`dataSource`|
|`schemacache/realtime/count`|Number of realtime segments for which schema is cached.||Depends on the number of realtime segments.|
|`schemacache/finalizedSegmentMetadata/count`|Number of finalized segments for which schema metadata is cached.||Depends on the number of segments in the cluster.|
|`schemacache/finalizedSchemaPayload/count`|Number of finalized segment schema cached.||Depends on the number of distinct schema in the cluster.|
|`schemacache/inTransitSMQResults/count`|Number of segments for which schema was fetched by executing segment metadata query.||Eventually it should be 0.|
|`schemacache/inTransitSMQPublishedResults/count`|Number of segments for which schema is cached after back filling in the database.||Eventually it should be 0.|
|`serverview/sync/healthy`|Sync status of the Broker with a segment-loading server such as a Historical or Peon. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled. This metric can be used in conjunction with `serverview/sync/unstableTime` to debug slow startup of Brokers.|`server`, `tier`|1 for fully synced servers, 0 otherwise|
|`serverview/sync/unstableTime`|Time in milliseconds for which the Broker has been failing to sync with a segment-loading server. Emitted only when [HTTP-based server view](../configuration/index.md#segment-management) is enabled.|`server`, `tier`|Not emitted for synced servers.|
|`subquery/rowLimit/count`|Number of subqueries whose results are materialized as rows (Java objects on heap).|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.AuthorizerMapper;
Expand Down Expand Up @@ -88,19 +90,28 @@ public class MaterializedViewSupervisorTest
private String derivativeDatasourceName;
private MaterializedViewSupervisorSpec spec;
private final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
private SegmentSchemaManager segmentSchemaManager;

@Before
public void setUp()
{
TestDerbyConnector derbyConnector = derbyConnectorRule.getConnector();
derbyConnector.createDataSourceTable();
derbyConnector.createSegmentSchemasTable();
derbyConnector.createSegmentTable();
taskStorage = EasyMock.createMock(TaskStorage.class);
taskMaster = EasyMock.createMock(TaskMaster.class);
segmentSchemaManager = new SegmentSchemaManager(
derbyConnectorRule.metadataTablesConfigSupplier().get(),
objectMapper,
derbyConnector
);
indexerMetadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
objectMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector
derbyConnector,
segmentSchemaManager,
CentralizedDatasourceSchemaConfig.create()
);
metadataSupervisorManager = EasyMock.createMock(MetadataSupervisorManager.class);
sqlSegmentsMetadataManager = EasyMock.createMock(SqlSegmentsMetadataManager.class);
Expand Down Expand Up @@ -142,8 +153,8 @@ public void testCheckSegments() throws IOException
final Interval day1 = baseSegments.get(0).getInterval();
final Interval day2 = new Interval(day1.getStart().plusDays(1), day1.getEnd().plusDays(1));

indexerMetadataStorageCoordinator.commitSegments(new HashSet<>(baseSegments));
indexerMetadataStorageCoordinator.commitSegments(derivativeSegments);
indexerMetadataStorageCoordinator.commitSegments(new HashSet<>(baseSegments), null);
indexerMetadataStorageCoordinator.commitSegments(derivativeSegments, null);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
Expand All @@ -165,8 +176,8 @@ public void testSubmitTasksDoesNotFailIfTaskAlreadyExists() throws IOException
Set<DataSegment> baseSegments = Sets.newHashSet(createBaseSegments());
Set<DataSegment> derivativeSegments = Sets.newHashSet(createDerivativeSegments());

indexerMetadataStorageCoordinator.commitSegments(baseSegments);
indexerMetadataStorageCoordinator.commitSegments(derivativeSegments);
indexerMetadataStorageCoordinator.commitSegments(baseSegments, null);
indexerMetadataStorageCoordinator.commitSegments(derivativeSegments, null);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
Expand All @@ -187,8 +198,8 @@ public void testSubmitTasksFailsIfTaskCannotBeAdded() throws IOException
Set<DataSegment> baseSegments = Sets.newHashSet(createBaseSegments());
Set<DataSegment> derivativeSegments = Sets.newHashSet(createDerivativeSegments());

indexerMetadataStorageCoordinator.commitSegments(baseSegments);
indexerMetadataStorageCoordinator.commitSegments(derivativeSegments);
indexerMetadataStorageCoordinator.commitSegments(baseSegments, null);
indexerMetadataStorageCoordinator.commitSegments(derivativeSegments, null);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
Expand All @@ -211,7 +222,7 @@ public void testSubmitTasksFailsIfTaskCannotBeAdded() throws IOException
public void testCheckSegmentsAndSubmitTasks() throws IOException
{
Set<DataSegment> baseSegments = Collections.singleton(createBaseSegments().get(0));
indexerMetadataStorageCoordinator.commitSegments(baseSegments);
indexerMetadataStorageCoordinator.commitSegments(baseSegments, null);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
Expand Down Expand Up @@ -89,12 +91,14 @@ public class DatasourceOptimizerTest extends CuratorTestBase
private IndexerSQLMetadataStorageCoordinator metadataStorageCoordinator;
private BatchServerInventoryView baseView;
private BrokerServerView brokerServerView;
private SegmentSchemaManager segmentSchemaManager;

@Before
public void setUp() throws Exception
{
TestDerbyConnector derbyConnector = derbyConnectorRule.getConnector();
derbyConnector.createDataSourceTable();
derbyConnector.createSegmentSchemasTable();
derbyConnector.createSegmentTable();
MaterializedViewConfig viewConfig = new MaterializedViewConfig();
jsonMapper = TestHelper.makeJsonMapper();
Expand All @@ -106,10 +110,18 @@ public void setUp() throws Exception
jsonMapper,
derbyConnector
);
segmentSchemaManager = new SegmentSchemaManager(
derbyConnectorRule.metadataTablesConfigSupplier().get(),
jsonMapper,
derbyConnector
);

metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
jsonMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
derbyConnector
derbyConnector,
segmentSchemaManager,
CentralizedDatasourceSchemaConfig.create()
);

setupServerAndCurator();
Expand Down Expand Up @@ -167,7 +179,7 @@ public void testOptimize() throws InterruptedException
1024 * 1024
);
try {
metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment));
metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment), null);
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
}
catch (IOException e) {
Expand All @@ -192,7 +204,7 @@ public void testOptimize() throws InterruptedException
1024
);
try {
metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment));
metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment), null);
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.skife.jdbi.v2.Binding;
import org.skife.jdbi.v2.ColonPrefixNamedParamStatementRewriter;
import org.skife.jdbi.v2.DBI;
Expand Down Expand Up @@ -133,9 +134,13 @@ public class SQLServerConnector extends SQLMetadataConnector
));

@Inject
public SQLServerConnector(Supplier<MetadataStorageConnectorConfig> config, Supplier<MetadataStorageTablesConfig> dbTables)
public SQLServerConnector(
Supplier<MetadataStorageConnectorConfig> config,
Supplier<MetadataStorageTablesConfig> dbTables,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
super(config, dbTables);
super(config, dbTables, centralizedDatasourceSchemaConfig);

final BasicDataSource datasource = getDatasource();
datasource.setDriverClassLoader(getClass().getClassLoader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Suppliers;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -38,7 +39,8 @@ public void testIsTransientException()
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(
MetadataStorageTablesConfig.fromBase(null)
)
),
CentralizedDatasourceSchemaConfig.create()
);

Assert.assertTrue(connector.isTransientException(new SQLException("Resource Failure!", "08DIE")));
Expand All @@ -59,7 +61,8 @@ public void testLimitClause()
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(
MetadataStorageTablesConfig.fromBase(null)
)
),
CentralizedDatasourceSchemaConfig.create()
);
Assert.assertEquals("FETCH NEXT 100 ROWS ONLY", connector.limitClause(100));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1584,9 +1584,9 @@ private static TaskAction<SegmentPublishResult> createAppendAction(
)
{
if (taskLockType.equals(TaskLockType.APPEND)) {
return SegmentTransactionalAppendAction.forSegments(segments);
return SegmentTransactionalAppendAction.forSegments(segments, null);
} else if (taskLockType.equals(TaskLockType.SHARED)) {
return SegmentTransactionalInsertAction.appendAction(segments, null, null);
return SegmentTransactionalInsertAction.appendAction(segments, null, null, null);
} else {
throw DruidException.defensive("Invalid lock type [%s] received for append action", taskLockType);
}
Expand All @@ -1598,9 +1598,9 @@ private TaskAction<SegmentPublishResult> createOverwriteAction(
)
{
if (taskLockType.equals(TaskLockType.REPLACE)) {
return SegmentTransactionalReplaceAction.create(segmentsWithTombstones);
return SegmentTransactionalReplaceAction.create(segmentsWithTombstones, null);
} else if (taskLockType.equals(TaskLockType.EXCLUSIVE)) {
return SegmentTransactionalInsertAction.overwriteAction(null, segmentsWithTombstones);
return SegmentTransactionalInsertAction.overwriteAction(null, segmentsWithTombstones, null);
} else {
throw DruidException.defensive("Invalid lock type [%s] received for overwrite action", taskLockType);
}
Expand Down
Loading

0 comments on commit e30790e

Please sign in to comment.