Skip to content

Commit

Permalink
Eliminate Periodic Realtime Segment Metadata Queries: Task Now Publis…
Browse files Browse the repository at this point in the history
…h Schema for Seamless Coordinator Updates (#15475)

The initial step in optimizing segment metadata was to centralize the construction of datasource schema in the Coordinator (#14985). Subsequently, our goal is to eliminate the requirement for regularly executing queries to obtain segment schema information. This task encompasses addressing both realtime and finalized segments.

This modification specifically addresses the issue with realtime segments. Tasks will now routinely communicate the schema for realtime segments during the segment announcement process. The Coordinator will identify the schema alongside the segment announcement and subsequently update the schema for realtime segments in the metadata cache.
  • Loading branch information
findingrish authored Jan 10, 2024
1 parent 85b8cf9 commit 71f5307
Show file tree
Hide file tree
Showing 65 changed files with 2,416 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.initialization.ZkPathsConfig;
Expand Down Expand Up @@ -291,6 +292,12 @@ public CallbackAction segmentViewInitialized()
{
return callback.segmentViewInitialized();
}

@Override
public CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas)
{
return CallbackAction.CONTINUE;
}
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
Expand Down Expand Up @@ -130,6 +131,7 @@ public class TaskToolbox

private final TaskLogPusher taskLogPusher;
private final String attemptId;
private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;

public TaskToolbox(
SegmentLoaderConfig segmentLoaderConfig,
Expand Down Expand Up @@ -171,7 +173,8 @@ public TaskToolbox(
ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider,
ShuffleClient shuffleClient,
TaskLogPusher taskLogPusher,
String attemptId
String attemptId,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
this.segmentLoaderConfig = segmentLoaderConfig;
Expand Down Expand Up @@ -215,6 +218,7 @@ public TaskToolbox(
this.shuffleClient = shuffleClient;
this.taskLogPusher = taskLogPusher;
this.attemptId = attemptId;
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
}

public SegmentLoaderConfig getSegmentLoaderConfig()
Expand Down Expand Up @@ -487,6 +491,11 @@ public RuntimeInfo getAdjustedRuntimeInfo()
return createAdjustedRuntimeInfo(JvmUtils.getRuntimeInfo(), appenderatorsManager);
}

public CentralizedDatasourceSchemaConfig getCentralizedTableSchemaConfig()
{
return centralizedDatasourceSchemaConfig;
}

/**
* Create {@link AdjustedRuntimeInfo} based on the given {@link RuntimeInfo} and {@link AppenderatorsManager}. This
* is a way to allow code to properly apportion the amount of processors and heap available to the entire JVM.
Expand Down Expand Up @@ -553,6 +562,7 @@ public static class Builder
private ShuffleClient shuffleClient;
private TaskLogPusher taskLogPusher;
private String attemptId;
private CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;

public Builder()
{
Expand Down Expand Up @@ -598,6 +608,7 @@ public Builder(TaskToolbox other)
this.intermediaryDataManager = other.intermediaryDataManager;
this.supervisorTaskClientProvider = other.supervisorTaskClientProvider;
this.shuffleClient = other.shuffleClient;
this.centralizedDatasourceSchemaConfig = other.centralizedDatasourceSchemaConfig;
}

public Builder config(final SegmentLoaderConfig segmentLoaderConfig)
Expand Down Expand Up @@ -840,6 +851,12 @@ public Builder attemptId(final String attemptId)
return this;
}

public Builder centralizedTableSchemaConfig(final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig)
{
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
return this;
}

public TaskToolbox build()
{
return new TaskToolbox(
Expand Down Expand Up @@ -882,7 +899,8 @@ public TaskToolbox build()
supervisorTaskClientProvider,
shuffleClient,
taskLogPusher,
attemptId
attemptId,
centralizedDatasourceSchemaConfig
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.DruidNode;
Expand Down Expand Up @@ -114,6 +115,7 @@ public class TaskToolboxFactory
private final ShuffleClient shuffleClient;
private final TaskLogPusher taskLogPusher;
private final String attemptId;
private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;

@Inject
public TaskToolboxFactory(
Expand Down Expand Up @@ -155,7 +157,8 @@ public TaskToolboxFactory(
ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider,
ShuffleClient shuffleClient,
TaskLogPusher taskLogPusher,
@AttemptId String attemptId
@AttemptId String attemptId,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
this.segmentLoaderConfig = segmentLoadConfig;
Expand Down Expand Up @@ -197,6 +200,7 @@ public TaskToolboxFactory(
this.shuffleClient = shuffleClient;
this.taskLogPusher = taskLogPusher;
this.attemptId = attemptId;
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
}

public TaskToolbox build(Task task)
Expand Down Expand Up @@ -260,6 +264,7 @@ public TaskToolbox build(TaskConfig config, Task task)
.shuffleClient(shuffleClient)
.taskLogPusher(taskLogPusher)
.attemptId(attemptId)
.centralizedTableSchemaConfig(centralizedDatasourceSchemaConfig)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,8 @@ private Appenderator newAppenderator(
toolbox.getCachePopulatorStats(),
rowIngestionMeters,
parseExceptionHandler,
isUseMaxMemoryEstimates()
isUseMaxMemoryEstimates(),
toolbox.getCentralizedTableSchemaConfig()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.RealtimeMetricsMonitor;
import org.apache.druid.segment.realtime.SegmentPublisher;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import org.apache.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
Expand Down Expand Up @@ -310,6 +311,16 @@ public void unannounceSegments(Iterable<DataSegment> segments) throws IOExceptio
}
}
}

@Override
public void announceSegmentSchemas(String taskId, SegmentSchemas sinksSchema, SegmentSchemas sinksSchemaChange)
{
}

@Override
public void removeSegmentSchemasForTask(String taskId)
{
}
};

// NOTE: getVersion will block if there is lock contention, which will block plumber.getSink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ public Appenderator newAppenderator(
toolbox.getCachePopulatorStats(),
rowIngestionMeters,
parseExceptionHandler,
isUseMaxMemoryEstimates()
isUseMaxMemoryEstimates(),
toolbox.getCentralizedTableSchemaConfig()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLocalCacheManager;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
Expand Down Expand Up @@ -155,7 +156,8 @@ public void setUp() throws IOException
null,
null,
null,
"1"
"1",
CentralizedDatasourceSchemaConfig.create()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
Expand Down Expand Up @@ -1644,7 +1645,8 @@ public void close()
null,
null,
null,
"1"
"1",
CentralizedDatasourceSchemaConfig.create()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory;
Expand Down Expand Up @@ -1017,7 +1018,8 @@ public void close()
null,
null,
null,
"1"
"1",
CentralizedDatasourceSchemaConfig.create()
);

return toolboxFactory.build(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
Expand Down Expand Up @@ -70,7 +71,8 @@ public Appenderator createRealtimeAppenderatorForTask(
CachePopulatorStats cachePopulatorStats,
RowIngestionMeters rowIngestionMeters,
ParseExceptionHandler parseExceptionHandler,
boolean useMaxMemoryEstimates
boolean useMaxMemoryEstimates,
CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
)
{
realtimeAppenderator = Appenderators.createRealtime(
Expand All @@ -93,7 +95,8 @@ public Appenderator createRealtimeAppenderatorForTask(
cachePopulatorStats,
rowIngestionMeters,
parseExceptionHandler,
useMaxMemoryEstimates
useMaxMemoryEstimates,
centralizedDatasourceSchemaConfig
);
return realtimeAppenderator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.druid.segment.loading.NoopDataSegmentKiller;
import org.apache.druid.segment.loading.NoopDataSegmentMover;
import org.apache.druid.segment.loading.NoopDataSegmentPusher;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.SetAndVerifyContextQueryRunner;
Expand Down Expand Up @@ -135,7 +136,8 @@ public void setup() throws IOException
null,
null,
null,
"1"
"1",
CentralizedDatasourceSchemaConfig.create()
);
runner = new SingleTaskBackgroundRunner(
toolboxFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
import org.apache.druid.segment.loading.LocalDataSegmentKiller;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.NoopDataSegmentArchiver;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentTest;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
Expand Down Expand Up @@ -665,7 +666,8 @@ public void announceSegment(DataSegment segment)
null,
null,
null,
"1"
"1",
CentralizedDatasourceSchemaConfig.create()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
Expand Down Expand Up @@ -115,7 +116,8 @@ public TestTaskToolboxFactory(
bob.supervisorTaskClientProvider,
bob.shuffleClient,
bob.taskLogPusher,
bob.attemptId
bob.attemptId,
bob.centralizedDatasourceSchemaConfig
);
}

Expand Down Expand Up @@ -159,6 +161,7 @@ public static class Builder
private ShuffleClient shuffleClient;
private TaskLogPusher taskLogPusher;
private String attemptId;
private CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig;

public Builder setConfig(TaskConfig config)
{
Expand Down Expand Up @@ -387,5 +390,10 @@ public Builder setAttemptId(String attemptId)
this.attemptId = attemptId;
return this;
}

public void setCentralizedTableSchemaConfig(CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig)
{
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderator;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
Expand Down Expand Up @@ -700,7 +701,8 @@ public void close()
null,
null,
null,
"1"
"1",
CentralizedDatasourceSchemaConfig.create()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.indexing.test;

import com.google.common.collect.Sets;
import org.apache.druid.segment.realtime.appenderator.SegmentSchemas;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.timeline.DataSegment;

Expand Down Expand Up @@ -57,4 +58,13 @@ public void unannounceSegments(Iterable<DataSegment> segments)
}
}

@Override
public void announceSegmentSchemas(String taskId, SegmentSchemas segmentSchemas, SegmentSchemas segmentSchemasChange)
{
}

@Override
public void removeSegmentSchemasForTask(String taskId)
{
}
}
Loading

0 comments on commit 71f5307

Please sign in to comment.