-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce Segment Schema Publishing and Polling for Efficient Datasource Schema Building #15817
Introduce Segment Schema Publishing and Polling for Efficient Datasource Schema Building #15817
Conversation
…add javadocs for classes
…segment polled from coordiantor
….coordinator.centralizedSchemaManagement.enabled
… for broker-coordinator communication
…in segment metadata query. Changes in BrokerSegmentMetadataCache to refresh even if no new segments are added to the inventory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a comment. The PR seems very close to merge. I would review the SQL changes again.
@@ -192,7 +193,8 @@ public Pair<Integer, ReadableInput> apply(ReadableInput readableInput) | |||
frameContext.indexMerger(), | |||
meters, | |||
parseExceptionHandler, | |||
true | |||
true, | |||
CentralizedDatasourceSchemaConfig.create(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MSQ does not support centralized data source schema yet. I think we should put his comment here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to have this comment in the javadoc of CentralizedDatasourceSchemaConfig
itself.
@@ -220,31 +217,10 @@ protected List<? extends Module> getModules() | |||
@Override | |||
public void configure(Binder binder) | |||
{ | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MM less ingestion would need this check.
@@ -197,4 +204,31 @@ public void stop() | |||
return new Child(); | |||
} | |||
} | |||
|
|||
protected void validateCentralizedDatasourceSchemaConfig(Properties properties) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be static as well.
@@ -45,18 +51,27 @@ public FingerprintGenerator(ObjectMapper objectMapper) | |||
/** | |||
* Generates fingerprint or hash string for an object using SHA-256 hash algorithm. | |||
*/ | |||
public String generateFingerprint(Object payload) | |||
@SuppressWarnings("UnstableApiUsage") | |||
public String generateFingerprint(SchemaPayload schemaPayload, String dataSource, int version) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should have UT's so that we can assert the changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dependency update might cause issues.
docs/operations/metrics.md
Outdated
@@ -75,6 +75,12 @@ Most metric values reset each emission period, as specified in `druid.monitoring | |||
|`metadatacache/schemaPoll/count`|Number of coordinator polls to fetch datasource schema.|| | |||
|`metadatacache/schemaPoll/failed`|Number of failed coordinator polls to fetch datasource schema.|| | |||
|`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch datasource schema.|| | |||
|`metadatacache/backfill/count`|Number of segments for which schema was back filled in the database.|`dataSource`| | |||
|`schemacache/realtime/size`|Number of realtime segments for which schema is cached.||Depends on the number of realtime segments.| | |||
|`schemacache/finalizedSegmentMetadata/size`|Number of finalized segments for which schema metadata is cached.||Depends on the number of segments in the cluster.| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this should be count and not size. We can do this change as a followup.
*/ | ||
SegmentPublishResult commitAppendSegments( | ||
Set<DataSegment> appendSegments, | ||
Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock, | ||
@Nullable MinimalSegmentSchemas minimalSegmentSchemas | ||
String taskAllocatorId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this required ?
There are no javadocs for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is there in the master https://github.com/apache/druid/blob/master/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java#L498.
It seems you are looking at a subset of commits from PR, check this diff https://github.com/apache/druid/pull/15817/files#diff-519b0b98ee6a12cbb850f2f27fb6947e86e9353c79373ccb8d78d6113d1304b5R551.
"Schema version [%d] doesn't match the current version [%d], dropping the schema [%s].", | ||
minimalSegmentSchemas.getSchemaVersion(), | ||
"Schema version [%d] doesn't match the current version [%d]. Not persisting this schema [%s]. " | ||
+ "Schema for this segment will be poppulated by the schema backfill job in Coordinator.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+ "Schema for this segment will be poppulated by the schema backfill job in Coordinator.", | |
+ "Schema for this segment will be populated by the schema back-fill job in Coordinator.", |
@@ -175,7 +175,7 @@ public void stop() | |||
|
|||
public void leaderStart() | |||
{ | |||
log.info("%s starting cache initialization.", getClass().getSimpleName()); | |||
log.info("Initializing cache."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend adding the name of the class so I can search
cache %s in the logs :). It makes it easier for me to search thought tons of logs. since each loggers can have its own format.
Nit: can be done in a followup.
finalizedSegmentStats = ImmutableMap.of(); | ||
finalizedSegmentSchema.clear(); | ||
finalizedSegmentMetadata = ImmutableMap.of(); | ||
finalizedSegmentSchema = ImmutableMap.of(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this cause threadSafety issues if we change the reference ?
@@ -106,10 +108,10 @@ public SegmentSchemaCache(ServiceEmitter emitter) | |||
|
|||
public void setInitialized() | |||
{ | |||
log.info("[%s] initializing.", getClass().getSimpleName()); | |||
log.info("Initializing SegmentSchemaCache."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should have isInitalized() on the top no ?
I have addressed the feedback on the PR. I will raise a followup PR to enable schema publish in MSQ and address any feedback meant for later. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes lgtm. There are some rough edges that can be taken care as part of a follow up PR.
Thanks @findingrish for taking up this monumental effort.
log.error( | ||
"Schema version [%d] doesn't match the current version [%d]. Not persisting this schema [%s]. " | ||
+ "Schema for this segment will be populated by the schema backfill job in Coordinator.", | ||
segmentSchemaMapping.getSchemaVersion(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check should be outside the transaction. Lets create a follow up patch for that.
|
||
Set<String> columnsToAdd = new HashSet<>(); | ||
|
||
for (String columnName : columnNameTypes.keySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add a test case where we are checking this logic.
Note: Followup item.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to what @cryptoe said, I agree that we can refine this as we go but overall the changes seem okay.
Thanks for your patience on this, @findingrish !!
{ | ||
log.debug("Updating segment with schema and numRows information: [%s].", batch); | ||
|
||
// update schemaId and numRows in segments table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// update schemaId and numRows in segments table | |
// update fingerprint and numRows in segments table |
@@ -1435,6 +1435,7 @@ MiddleManagers pass their configurations down to their child peons. The MiddleMa | |||
|`druid.worker.baseTaskDirs`|List of base temporary working directories, one of which is assigned per task in a round-robin fashion. This property can be used to allow usage of multiple disks for indexing. This property is recommended in place of and takes precedence over `${druid.indexer.task.baseTaskDir}`. If this configuration is not set, `${druid.indexer.task.baseTaskDir}` is used. For example, `druid.worker.baseTaskDirs=[\"PATH1\",\"PATH2\",...]`.|null| | |||
|`druid.worker.baseTaskDirSize`|The total amount of bytes that can be used by tasks on any single task dir. This value is treated symmetrically across all directories, that is, if this is 500 GB and there are 3 `baseTaskDirs`, then each of those task directories is assumed to allow for 500 GB to be used and a total of 1.5 TB will potentially be available across all tasks. The actual amount of memory assigned to each task is discussed in [Configuring task storage sizes](../ingestion/tasks.md#configuring-task-storage-sizes)|`Long.MAX_VALUE`| | |||
|`druid.worker.category`|A string to name the category that the MiddleManager node belongs to.|`_default_worker_category`| | |||
|`druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled`| This config should be set when CentralizedDatasourceSchema feature is enabled. |false| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For follow-up PR:
The config description should be more like Indicates whether centralized schema management is enabled
. The description should also link to the page which contains the details of the feature.
@@ -75,6 +75,12 @@ Most metric values reset each emission period, as specified in `druid.monitoring | |||
|`metadatacache/schemaPoll/count`|Number of coordinator polls to fetch datasource schema.|| | |||
|`metadatacache/schemaPoll/failed`|Number of failed coordinator polls to fetch datasource schema.|| | |||
|`metadatacache/schemaPoll/time`|Time taken for coordinator polls to fetch datasource schema.|| | |||
|`metadatacache/backfill/count`|Number of segments for which schema was back filled in the database.|`dataSource`| | |||
|`schemacache/realtime/count`|Number of realtime segments for which schema is cached.||Depends on the number of realtime segments.| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For follow-up PR
Do these rows render correctly? The preceding rows have only 3 columns, this one seems to have 4.
|`schemacache/finalizedSegmentMetadata/count`|Number of finalized segments for which schema metadata is cached.||Depends on the number of segments in the cluster.| | ||
|`schemacache/finalizedSchemaPayload/count`|Number of finalized segment schema cached.||Depends on the number of distinct schema in the cluster.| | ||
|`schemacache/inTransitSMQResults/count`|Number of segments for which schema was fetched by executing segment metadata query.||Eventually it should be 0.| | ||
|`schemacache/inTransitSMQPublishedResults/count`|Number of segments for which schema is cached after back filling in the database.||Eventually it should be 0.| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For follow-up PR:
Is schemacache/
not the same as metadatacache
? The similar yet different names can be confusing.
|
||
for (String column : columns) { | ||
createStatementBuilder.append(column); | ||
createStatementBuilder.append(","); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For follow-up PR:
Nit: We seem to have removed the new line characters. They formatted the statement nicely in case we wanted to debug it.
@@ -905,7 +907,7 @@ private TaskStatus generateAndPublishSegments( | |||
try (final BatchAppenderatorDriver driver = BatchAppenderators.newDriver(appenderator, toolbox, segmentAllocator)) { | |||
driver.startJob(); | |||
|
|||
SegmentsAndCommitMetadata pushed = InputSourceProcessor.process( | |||
Pair<SegmentsAndCommitMetadata, SegmentSchemaMapping> commitMetadataAndSchema = InputSourceProcessor.process( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For follow-up PR
Why is SegmentSchemaMapping
not included inside the SegmentsAndCommitMetadata
object itself?
@@ -58,7 +61,7 @@ public class InputSourceProcessor | |||
* | |||
* @return {@link SegmentsAndCommitMetadata} for the pushed segments. | |||
*/ | |||
public static SegmentsAndCommitMetadata process( | |||
public static Pair<SegmentsAndCommitMetadata, SegmentSchemaMapping> process( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For follow-up PR
We should include SegmentSchemaMapping
inside the SegmentsAndCommitMetadata
itself.
? SegmentTransactionalAppendAction.forSegmentsAndMetadata(segmentsToPush, startMetadata, endMetadata, | ||
segmentSchemaMapping | ||
) | ||
: SegmentTransactionalInsertAction.appendAction(segmentsToPush, startMetadata, endMetadata, | ||
segmentSchemaMapping | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For follow-up PR
Please fix the formatting here.
@@ -110,4 +115,14 @@ public TaskStatus runTask(TaskToolbox toolbox) | |||
{ | |||
return status; | |||
} | |||
|
|||
public TaskAction<SegmentPublishResult> testBuildPublishAction( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For follow-up PR
public TaskAction<SegmentPublishResult> testBuildPublishAction( | |
public TaskAction<SegmentPublishResult> buildPublishAction( |
@@ -65,7 +65,7 @@ public static void setup() throws IOException | |||
expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-07/2017-10-08"), UNUSED_V1)); | |||
|
|||
actionTestKit.getMetadataStorageCoordinator() | |||
.commitSegments(expectedUnusedSegments); | |||
.commitSegments(expectedUnusedSegments, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For follow-up PR
Since passing null
is a very common usage right now, it would be better to keep two variants of the new methods. It would be easier to identify the usages which pass non-null values and we could also avoid passing nulls all over the place.
Description
Issue: #14989
The initial step in optimizing segment metadata was to centralize the construction of datasource schema in the Coordinator (#14985). Thereafter, we addressed the problem of publishing schema for realtime segments (#15475). Subsequently, our goal is to eliminate the requirement for regularly executing queries to obtain segment schema information.
This is the final change which involves publishing segment schema for finalized segments from task and periodically polling them in the Coordinator.
Design
Database
Schema Table
Table Name:
SegmentSchema
Purpose: Store unique schema for segment.
Columns
used
segmentsSegments Table
New columns will be added to the already existing
Segments
table.Columns
Task
Changes are required in the task to publish schema along with segment metadata.
SchemaPayload
to encapsulate RowSignature and AggregatorFactories.SegmentSchemaMetadata
to encapsulateSchemaPayload
andnumRows
.SegmentSchemaMapping
to encapsulate schema and numRows information for multiple segments.SegmentInsertAction
,SegmentTransactionalReplaceAction
,SegmentTransactionalAppendAction
&SegmentTransactionalInsertAction
to take in segment schema.Streaming
StreamAppenderator
to get the RowSignature, AggregatorFactories and numRows for the segment.Batch
IndexTask
ParallelIndexSupervisorTask
MSQ
SegmentGeneratorFrameProcessor
to return segment schema along with segment metadata.SegmentGeneratorFrameProcessorFactory
andControllerImpl
.Note, these changes are reverted for now.
Overlord
Changes are required in the Overlord (
IndexerSQLMetadataStorageCoordintor
) to persist the schema along with segment metadata in the database.Coordinator
Schema Poll
Changes in
SqlSegmentsMetadataManager
to poll schema along with segments.Also poll
schema_id
andnum_rows
additionally from segments table.Update schema cache.
Schema Caching
Maintain a cache of segment schema. Refer
SegmentSchemaCache
.It caches following information,
SegmentMetadataCache changes
Changes in
AbstractSegmentMetadataCache
class to add new method which will be overridden by child classes,additionalInitializationCondition
removeSegmentAction
segmentMetadataQueryResultHandler
Changes in
CoordinatorSegmentMetadataCache
to override methods fromAbstractSegmentMetadataCache
,additionalInitializationCondition
to wait for the segmentSchemaCache to be initialized.removeSegmentAction
to remove the schema from the schema cache.segmentMetadataQueryResultHandler
to additionally publish and cache the schema.Schema Backfill
Added a new class
SegmentSchemaBackFillQueue
which accepts segment schema and publish them in batch.Schema Cleanup
CoordinatorDuty to clean up schema which is not referenced by any segment.
Coordinator leader flow changes
CoordinatorSegmentMetadataCache
refresh is executed only on the leader node.CoordinatorSegmentMetadataCache
timeline callback continue to function on all Coordinator nodes.SegmentSchemaCache
is populated only on the leader node, except for the realtime schema information which is updated on all Coordinator nodes.SegmentSchemaBackFillQueue
functions only on the leader node.Testing
centralized-table-schema
runs successfully.Upgrade considerations
The general upgrade order should be followed. The new code is behind a feature flag, so it is compatible with existing setups. Task with new changes can communicate with old version of Overlord.
Release Notes
This feature addresses multiple challenges outlined in the linked issue. To enable it, set
druid.centralizedDatasourceSchema.enabled
.If MM is used then set
druid.indexer.fork.property.druid.centralizedDatasourceSchema.enabled
.When the feature is enabled,
To rollback, simply turn off the feature flag. The database schema change is not rolled back on turning off the feature.
New configs,
druid.coordinator.kill.segmentSchema.on
druid.coordinator.kill.segmentSchema.period
druid.coordinator.kill.segmentSchema.durationToRetain
Important metrics to track,
metadatacache/schemaPoll/count
metadatacache/schemaPoll/failed
metadatacache/schemaPoll/time
metadatacache/init/time
metadatacache/refresh/count
metadatacache/refresh/time
metadatacache/backfill/count
schemacache/realtime/size
schemacache/finalizedSegmentMetadata/size
schemacache/finalizedSchemaPayload/size
schemacache/inTransitSMQResults/size
schemacache/inTransitSMQPublishedResults/size
This PR has: