Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Segment Schema Publishing and Polling for Efficient Datasource Schema Building #15817

Merged
merged 281 commits into from
Apr 24, 2024
Merged
Changes from 1 commit
Commits
Show all changes
281 commits
Select commit Hold shift + click to select a range
33a8dd5
Remove enabled field from SegmentMetadataCacheConfig
findingrish Sep 12, 2023
7a7ca55
Add class to manage druid table information in SegmentMetadataCache, …
findingrish Sep 13, 2023
eb6a145
Merge remote-tracking branch 'origin/master' into coordinator_builds_…
findingrish Sep 13, 2023
b9fb83d
Minor refactoring in SegmentMetadataCache
findingrish Sep 13, 2023
aa2bfe7
Make SegmentMetadataCache generic
findingrish Sep 13, 2023
e97dcda
Add a generic abstract class for segment metadata cache
findingrish Sep 13, 2023
7badce1
Rename SegmentMetadataCache to CoordinatorSegmentMetadataCache
findingrish Sep 13, 2023
25cdce6
Rename PhysicalDataSourceMetadataBuilder to PhysicalDataSourceMetadat…
findingrish Sep 13, 2023
5f5ad18
Fix json property key name in DataSourceInformation
findingrish Sep 13, 2023
08e949e
Add validation in MetadataResource#getAllUsedSegments, update javadocs
findingrish Sep 14, 2023
80fc09d
Minor changes
findingrish Sep 14, 2023
4217cd8
Minor change
findingrish Sep 14, 2023
8b7e483
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Sep 14, 2023
d6ac350
Update base property name for query config classes in Coordinator
findingrish Sep 14, 2023
533236b
Log ds schema change when polling from coordinator
findingrish Sep 15, 2023
70f0888
update the logic to determine is_active status in segments table for …
findingrish Sep 15, 2023
a176bfe
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Sep 15, 2023
b32dfd6
Update the logic to set numRows in the sys segments table, add comments
findingrish Sep 15, 2023
17417b5
Rename config druid.coordinator.segmentMetadataCache.enabled to druid…
findingrish Sep 15, 2023
6a395a9
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Sep 18, 2023
907ace3
Report cache init time irrespective of the awaitInitializationOnStart…
findingrish Sep 20, 2023
cf68c38
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Sep 20, 2023
441f37a
Report metric for fetching schema from coordinator
findingrish Sep 20, 2023
bd5b048
Add auth check in api to return dataSourceInformation, report metrics…
findingrish Sep 21, 2023
933d8d1
Fix bug in Coordinator api to return dataSourceInformation
findingrish Sep 21, 2023
9e7e364
Minor change
findingrish Sep 21, 2023
e7356ce
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Sep 22, 2023
5d16148
Address comments around docs, minor renaming
findingrish Sep 23, 2023
d8884be
Remove null check from MetadataResource#getDataSourceInformation
findingrish Sep 23, 2023
0f0805a
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Sep 23, 2023
e129d3e
Install cache module in Coordinator, if feature is enabled and beOver…
findingrish Sep 25, 2023
b4042c6
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Sep 29, 2023
01c27c9
Minor change in QueryableCoordinatorServerView
findingrish Sep 29, 2023
87c9873
Remove QueryableCoordinatorServerView, add a new QuerySegmentWalker i…
findingrish Oct 14, 2023
971b347
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Oct 14, 2023
89d3845
fix build
findingrish Oct 14, 2023
270dbd5
fix build
findingrish Oct 14, 2023
2da23b8
Fix spelling, intellij-inspection, codeql bug
findingrish Oct 14, 2023
6f568a6
undo some changes in CachingClusteredClientTest
findingrish Oct 14, 2023
fe229c0
minor changes
findingrish Oct 14, 2023
473b25c
Fix typo in metric name
findingrish Oct 14, 2023
39fb248
temporarily enable feature on ITs
findingrish Oct 15, 2023
cac695a
fix checkstyle issue
findingrish Oct 15, 2023
eb3e3c1
Changes in CliCoordinator to conditionally add segment metadata cache…
findingrish Oct 15, 2023
30438f4
temporary changes to debug IT failure
findingrish Oct 16, 2023
e88ad00
revert temporary changes in gha
findingrish Oct 16, 2023
61d130b
revert temporary changes to run ITs with this feature
findingrish Oct 16, 2023
2e4c45b
update docs with the config for enabling feature
findingrish Oct 16, 2023
255cf2c
update docs with the config for enabling feature
findingrish Oct 16, 2023
1a6dfc5
Add IT for the feature
findingrish Oct 17, 2023
a961501
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Oct 17, 2023
2e65726
Merge branch 'coordinator_builds_ds_schema' of github.com:findingrish…
findingrish Oct 17, 2023
4b51c42
Changes in BrokerSegmentMetadataCache to poll schema for all the loca…
findingrish Oct 20, 2023
a4e2097
Address review comments
findingrish Oct 26, 2023
3ca03c9
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Oct 26, 2023
902abd3
Run DruidSchemaInternRowSignatureBenchmark using BrokerSegmentMetadat…
findingrish Oct 26, 2023
bcab458
Address feedback
findingrish Oct 26, 2023
32a4065
Simplify logic for setting isRealtime in sys segments table
findingrish Oct 26, 2023
6b04ee7
Remove forbidden api invocation
findingrish Oct 26, 2023
cb93e43
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Oct 26, 2023
152c480
Debug log when coordinator poll fails
findingrish Oct 26, 2023
80c6d26
Fix CoordinatorSegmentMetadataCacheTest
findingrish Oct 27, 2023
9cca98e
Merge remote-tracking branch 'upstream/master' into coordinator_build…
findingrish Oct 28, 2023
bb69cde
Minor changes
findingrish Oct 28, 2023
a604e65
Initial class for persisting and retrieving segment schema
findingrish Nov 6, 2023
f2d8a5e
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Nov 6, 2023
37ac4a7
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Dec 12, 2023
629f3ad
fix conflicts
findingrish Dec 12, 2023
af34c34
Revert changes in SegmentLoadInfo
findingrish Dec 12, 2023
4a4f1b0
Remove segmentSchemaMapping table
findingrish Dec 12, 2023
b142b29
Initial commit
findingrish Dec 15, 2023
035a884
Outline
findingrish Jan 10, 2024
7935349
Revert some changes
findingrish Jan 10, 2024
26ee22f
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Jan 10, 2024
1555592
minor changes
findingrish Jan 10, 2024
ccd2c42
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Jan 10, 2024
900dad5
Minor changes
findingrish Jan 10, 2024
45423fc
Add kill task, update schema persist code
findingrish Jan 14, 2024
33cfb75
Initial task side changes to publish schema
findingrish Jan 15, 2024
6c7e0d5
Changes to publish schema for streaming task
findingrish Jan 21, 2024
85594a7
Update schema table
findingrish Jan 22, 2024
72f671c
Fix build
findingrish Jan 23, 2024
ecc8fc4
Changes to publish realtime segment schema, poll, cache and use it fo…
findingrish Jan 31, 2024
566d09b
Publish schema for batch task
findingrish Feb 6, 2024
c81b90b
Remove design file
findingrish Feb 6, 2024
3fb3da5
Add MinimalSegmentSchemas class to pass segment schema from task to o…
findingrish Feb 8, 2024
8c7bd0e
Fix build
findingrish Feb 9, 2024
9fc3f0f
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Feb 9, 2024
6208486
Revert some changes
findingrish Feb 9, 2024
96ec515
Fix build KillUnusedSegmentsTaskTest
findingrish Feb 9, 2024
3f7e9c0
Fix intellij-inspection failures
findingrish Feb 9, 2024
f9dde90
checkstyle
findingrish Feb 9, 2024
034d1ef
Add more UTs
findingrish Feb 12, 2024
ee303da
Add test to verify changes in CoordinatorSegmentMetadataCache
findingrish Feb 13, 2024
1d1e983
Fix test failures in indexing and server module
findingrish Feb 13, 2024
500c375
Fix checkstyle
findingrish Feb 14, 2024
d9bc9bc
Fix DatasourceOptimizerTest
findingrish Feb 14, 2024
9e89aed
Fix ConcurrentReplaceAndAppendTest
findingrish Feb 14, 2024
6210504
Revert some change
findingrish Feb 14, 2024
a0d2886
Fix KinesisIndexTaskTest
findingrish Feb 14, 2024
88826f7
Test to up the coverage
findingrish Feb 14, 2024
8e7539e
Minor change
findingrish Feb 14, 2024
a3343ff
checkstyle
findingrish Feb 14, 2024
9852f9a
Fix test
findingrish Feb 15, 2024
fbf1189
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Feb 15, 2024
d1f3180
Create schema table and alter segments table only when feature is ena…
findingrish Feb 16, 2024
ffc8e06
Fix test
findingrish Feb 17, 2024
cede729
Run KillUnreferencedSegmentSchemas duty only when the feature is enabled
findingrish Feb 19, 2024
264b19e
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Feb 19, 2024
052d026
Fix sql based ingestion
findingrish Feb 19, 2024
96c05b1
Fix issues in passing feature flag to task
findingrish Feb 20, 2024
17ca156
Temp changes
findingrish Feb 20, 2024
0b49546
Retrigger build
findingrish Feb 20, 2024
5607196
MSQ side changes to publish segment schema
findingrish Feb 21, 2024
ab5c0cf
Fix bug in segment table creation
findingrish Feb 21, 2024
f417abb
checkstyle
findingrish Feb 21, 2024
0ac4c83
Build segment schema before the directory is removed
findingrish Feb 21, 2024
d8cd78e
Remove logging
findingrish Feb 21, 2024
b66a1f0
Fix centralized-datasource-schema IT
findingrish Feb 21, 2024
b1b9529
Temp changes
findingrish Feb 22, 2024
e8a6d9b
run all IT with the feature
findingrish Feb 22, 2024
7cf889d
test changes
findingrish Feb 23, 2024
e0e97ba
Add docs, checkstyle, minor code changes
findingrish Feb 25, 2024
9175e93
Revert changes in integration-tests-ex
findingrish Feb 25, 2024
aa52df0
Revert changes in examples
findingrish Feb 25, 2024
9fdeedd
Revert change in MetadataStorageUpdaterJobSpec
findingrish Feb 25, 2024
fd4e9f6
Revert changes in integration-tests environment configs
findingrish Feb 25, 2024
e86131c
Minor code changes
findingrish Feb 26, 2024
5d55539
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Mar 1, 2024
5a19650
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Mar 5, 2024
348a72c
CoordinatorSegmentMetadataCache refresh is executed only on leader node
findingrish Mar 5, 2024
5e9e58f
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Mar 11, 2024
f2e9488
Add debug logging
findingrish Mar 11, 2024
564e038
Fix build
findingrish Mar 12, 2024
88b0146
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Mar 12, 2024
2ad6639
Fix build
findingrish Mar 12, 2024
c7980b0
Add exception handling in /druid/coordinator/v1/metadata/segments api
findingrish Mar 14, 2024
39f9328
Fix bug where existing schema is not being associated with segment
findingrish Mar 14, 2024
924268d
checkstyle
findingrish Mar 14, 2024
e14ab29
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Mar 14, 2024
ca33d8b
Fix build
findingrish Mar 14, 2024
cfece06
Fix build
findingrish Mar 14, 2024
e103336
Fix IndexerSQLMetadataStorageCoordinatorTest#testCommitReplaceSegment…
findingrish Mar 14, 2024
9875c44
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Mar 14, 2024
634ced2
Fix DataSegmentPlusTest failure
findingrish Mar 14, 2024
43f1850
Add logs when schemaId or numRows is null in the cache
findingrish Mar 14, 2024
12e36d0
Fix: Schedule schemaBackfillExecutor at fixed rate
findingrish Mar 15, 2024
c65b2f2
Fix schema generation for IndexTask
findingrish Mar 18, 2024
b6b578b
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Mar 18, 2024
8d962da
Add test config to disable segmentMetadataQuery in Coordinator and sc…
findingrish Mar 19, 2024
2723796
Update schema poll logic: Poll schema id greater than max schema id p…
findingrish Mar 19, 2024
5969c60
Merge branch 'master' into coordinator_schema_read_write
findingrish Mar 19, 2024
78d760b
Add 2 new IT profiles to test CentralizedDatasourceSchema feature
findingrish Mar 19, 2024
0fa4f8d
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Mar 19, 2024
bf20a25
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Mar 19, 2024
9f78e1d
Use AtomicReference for latestSchemaId, handle potential race with th…
findingrish Mar 21, 2024
8816298
Fix bug where segment schema is cleared when segment is loaded on mul…
findingrish Mar 21, 2024
ac77102
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Mar 21, 2024
21a7320
Replace pair of segments and schema with SegmentAndSchemas, SegmentAn…
findingrish Mar 21, 2024
0963fca
Fix IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest#testCom…
findingrish Mar 21, 2024
80c88de
Add test to verify the behaviour when segments is present on multiple…
findingrish Mar 21, 2024
f783636
Update CoordinatorSegmentMetadataCache startup logic
findingrish Mar 22, 2024
0786a6a
Fix test
findingrish Mar 22, 2024
3415418
Run CoordinatorSegmentMetadataCache#refresh only on leader node
findingrish Mar 25, 2024
873f373
Trigger datasource schema polling from Coordinator in each refresh cy…
findingrish Mar 25, 2024
e3736af
Add test for ParallelIndexSupervisorTask and CompactionTask. Update s…
findingrish Mar 26, 2024
f4fb2ec
More task side UTs
findingrish Mar 26, 2024
b11ec29
Add custom metrics
findingrish Mar 27, 2024
52e7c05
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Mar 27, 2024
9a7b1b6
Use DruidLeaderSelector instead of DruidCoordinator in CliCoordinator…
findingrish Mar 28, 2024
249313a
Minor changes
findingrish Mar 28, 2024
5acb93d
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Mar 28, 2024
b0ab4d9
Check if constraint exists before altering table
findingrish Apr 1, 2024
1d1c803
Add config to disable foreign key creation
findingrish Apr 3, 2024
cf7901a
Revert "Add config to disable foreign key creation"
findingrish Apr 4, 2024
5f729f6
Revert "Check if constraint exists before altering table"
findingrish Apr 4, 2024
e883918
Add datasource to the schema table, refactor schema kill logic
findingrish Apr 11, 2024
aa1ab56
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Apr 11, 2024
864bc91
Review comments
findingrish Apr 12, 2024
a582417
Minor change
findingrish Apr 12, 2024
6fb906f
Review comments
findingrish Apr 12, 2024
43111e7
Validate CentralizedDatasourceSchemaConfig in broker, coordinator, hi…
findingrish Apr 12, 2024
55cf345
Fix build
findingrish Apr 12, 2024
2b97164
Add version information in schema table
findingrish Apr 14, 2024
03c0c94
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Apr 14, 2024
08d0498
Fix build
findingrish Apr 14, 2024
1b4497d
Fix test failures
findingrish Apr 14, 2024
ff76aeb
codeql comments
findingrish Apr 15, 2024
5c181f6
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Apr 15, 2024
dd46989
Revert msq changes
findingrish Apr 15, 2024
bc0e4e3
Fix build in ControllerImpl
findingrish Apr 15, 2024
b54bcac
Fix ControllerImplTest
findingrish Apr 15, 2024
29107b9
Refactor createSegmentTable code
findingrish Apr 15, 2024
f8f8a97
Update FingerprintGenerator to use Hex#encodeHexString for converting…
findingrish Apr 15, 2024
8a75693
Change schema version type to integer
findingrish Apr 16, 2024
a768dba
Minor code restructuring in IndexerSQlMetadataStorageCoordinator
findingrish Apr 16, 2024
6e333a0
Remove defensive check in SegmentSchemaManager
findingrish Apr 16, 2024
8548f2a
Refactor schema poll lambda in SqlSegmentsMetadataManager
findingrish Apr 16, 2024
edec5e7
Add docs, some nit code changes
findingrish Apr 17, 2024
e4d525d
Minor changes in SegmentSchemaBackFillQueue
findingrish Apr 17, 2024
86cc24c
Move validation logic to ServerRunnable
findingrish Apr 17, 2024
0f7df79
MinimalSegmentSchemas refactoring and other minor changes
findingrish Apr 17, 2024
43b6b96
Rename MinimalSegmentSchemas to SegmentSchemaMapping
findingrish Apr 17, 2024
b57f987
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Apr 17, 2024
08f1c15
Refactor kill schema duty
findingrish Apr 18, 2024
1c66e5d
Minor change
findingrish Apr 18, 2024
86d2d90
Minor change
findingrish Apr 18, 2024
e629f9a
Merge branch 'coordinator_schema_read_write' of github.com:findingris…
findingrish Apr 18, 2024
77b181f
codeql comments
findingrish Apr 18, 2024
bf37982
Rename DataSegmentWithSchemas to DataSegmentsWithSchemas
findingrish Apr 18, 2024
4efe3fb
Update docs
findingrish Apr 18, 2024
7ac88c4
Persist aggregatorFactories in inTransitSMQResult cache
findingrish Apr 18, 2024
5382669
Use schema fingerprint in segments table
findingrish Apr 19, 2024
f3b1711
Add test for changes in ServerRunnable
findingrish Apr 19, 2024
e0db66c
Minor code changes
findingrish Apr 19, 2024
8c29649
Fix SqlMetadataConnectorTest
findingrish Apr 19, 2024
158f124
Refactor code in IndexerSQLMetadataStorageCoordinator
findingrish Apr 19, 2024
5e6dff5
Some more refactoring
findingrish Apr 19, 2024
fdc46cc
Some more refactoring
findingrish Apr 19, 2024
e9dbe3c
Minor change
findingrish Apr 19, 2024
f3545ac
Fix test failure
findingrish Apr 19, 2024
6138338
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Apr 19, 2024
aba03e0
Fix bug in IndexerSQLMetadataStorageCoordinator#commitReplaceSegments
findingrish Apr 19, 2024
55c4ed4
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Apr 21, 2024
28359e8
Update docs
findingrish Apr 21, 2024
7886412
Update segment schema cache metrics
findingrish Apr 21, 2024
14efbac
Temporary change
findingrish Apr 21, 2024
ca571ee
Change schemaMap in SegmentSchemaCache from ConcurrentMap to Immutabl…
findingrish Apr 21, 2024
4277760
Update docs
findingrish Apr 21, 2024
5e7a6a6
Update test
findingrish Apr 21, 2024
86351ae
Fix checkstyle
findingrish Apr 21, 2024
a5827a6
Add test for FingerprintGenerator
findingrish Apr 22, 2024
5383967
Add config validation in CliPeon
findingrish Apr 22, 2024
5bf86be
Initialize schema cache
findingrish Apr 22, 2024
a07eccf
Enable feature in IngestionTestBase
findingrish Apr 22, 2024
0023d24
Fix SqlSegmentsMetadataManagerSchemaPollTest
findingrish Apr 22, 2024
85208f2
Minor code changes
findingrish Apr 22, 2024
11e5ca7
Poll schema for all datasources in the inventory in Broker
findingrish Apr 22, 2024
e78fb65
Tests for coverage
findingrish Apr 23, 2024
bb7e98b
Minor change in SegmentSchemaCache
findingrish Apr 23, 2024
ae43a83
Tests to meet coverage
findingrish Apr 23, 2024
26a6472
Expose a method in AbstractSegmentMetadataCache to fetch aggregators …
findingrish Apr 23, 2024
e65ff4d
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Apr 23, 2024
a05043b
Add a note
findingrish Apr 23, 2024
c44b289
nit changes
findingrish Apr 23, 2024
9033a46
Rename some methods, add null check when accessing cacheExecFuture
findingrish Apr 23, 2024
7e349ce
Enable CentralizedDatasourceSchema config in ParallelIndexSupervisorT…
findingrish Apr 23, 2024
472bce9
Encapsulate finalizedSegmentStats & finalizedSegmentSchema in a class…
findingrish Apr 23, 2024
bc1e131
Fix potential npe while accessing segmentMetadataInfo
findingrish Apr 24, 2024
4adfc77
Merge remote-tracking branch 'upstream/master' into coordinator_schem…
findingrish Apr 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Rename some methods, add null check when accessing cacheExecFuture
  • Loading branch information
findingrish committed Apr 23, 2024
commit 9033a468f3d812fcbec4428022db00aa2c31ecc6
Original file line number Diff line number Diff line change
@@ -168,16 +168,18 @@ public void stop()
{
callbackExec.shutdownNow();
cacheExec.shutdownNow();
segmentSchemaCache.uninitialize();
segmentSchemaBackfillQueue.leaderStop();
cacheExecFuture = null;
segmentSchemaCache.onLeaderStop();
segmentSchemaBackfillQueue.onLeaderStop();
if (cacheExecFuture != null) {
cacheExecFuture.cancel(true);
}
}

public void leaderStart()
public void onLeaderStart()
{
log.info("Initializing cache.");
log.info("Initializing cache on leader node.");
try {
segmentSchemaBackfillQueue.leaderStart();
segmentSchemaBackfillQueue.onLeaderStart();
cacheExecFuture = cacheExec.submit(this::cacheExecLoop);
if (config.isAwaitInitializationOnStart()) {
awaitInitialization();
@@ -188,12 +190,14 @@ public void leaderStart()
}
}

public void leaderStop()
public void onLeaderStop()
{
log.info("Stopping cache.");
cacheExecFuture.cancel(true);
segmentSchemaCache.uninitialize();
segmentSchemaBackfillQueue.leaderStop();
log.info("No longer leader, stopping cache.");
if (cacheExecFuture != null) {
cacheExecFuture.cancel(true);
}
segmentSchemaCache.onLeaderStop();
segmentSchemaBackfillQueue.onLeaderStop();
}

/**
@@ -401,16 +405,13 @@ private void filterRealtimeSegments(Set<SegmentId> segmentIds)
private Set<SegmentId> filterSegmentWithCachedSchema(Set<SegmentId> segmentIds)
{
Set<SegmentId> cachedSegments = new HashSet<>();
synchronized (lock) {
for (SegmentId id : segmentIds) {
if (segmentSchemaCache.isSchemaCached(id)) {
cachedSegments.add(id);
}
for (SegmentId id : segmentIds) {
if (segmentSchemaCache.isSchemaCached(id)) {
cachedSegments.add(id);
}
}

segmentIds.removeAll(cachedSegments);

return cachedSegments;
}

Original file line number Diff line number Diff line change
@@ -94,20 +94,24 @@ public SegmentSchemaBackFillQueue(
public void stop()
{
this.executor.shutdownNow();
scheduledFuture = null;
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
}

public void leaderStart()
public void onLeaderStart()
{
if (isEnabled()) {
scheduledFuture = executor.scheduleAtFixedRate(this::processBatchesDueSafely, executionPeriod, executionPeriod, TimeUnit.MILLISECONDS);
}
}

public void leaderStop()
public void onLeaderStop()
{
if (isEnabled()) {
scheduledFuture.cancel(true);
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
}
}

Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@
* This mapping is updated on each database poll {@link SegmentSchemaCache#finalizedSegmentMetadata}.
* Segment schema created since last DB poll is also fetched and updated in the cache {@code finalizedSegmentSchema}.
* <p>
* Additionally, this class caches schema for realtime segments in {@link SegmentSchemaCache#realtimeSegmentSchemaMap}. This mapping
* Additionally, this class caches schema for realtime segments in {@link SegmentSchemaCache#realtimeSegmentSchema}. This mapping
* is cleared either when the segment is removed or marked as finalized.
* <p>
* Finalized segments which do not have their schema information present in the DB, fetch their schema via SMQ.
@@ -63,7 +63,9 @@ public class SegmentSchemaCache
{
private static final Logger log = new Logger(SegmentSchemaCache.class);

// Cache is marked initialized after first DB poll.
/**
* Cache is marked initialized after first DB poll.
*/
private final AtomicReference<CountDownLatch> initialized = new AtomicReference<>(new CountDownLatch(1));

/**
@@ -81,22 +83,14 @@ public class SegmentSchemaCache
* Schema information for realtime segment. This mapping is updated when schema for realtime segment is received.
* The mapping is removed when the segment is either removed or marked as finalized.
*/
private final ConcurrentMap<SegmentId, SchemaPayloadPlus> realtimeSegmentSchemaMap = new ConcurrentHashMap<>();
private final ConcurrentMap<SegmentId, SchemaPayloadPlus> realtimeSegmentSchema = new ConcurrentHashMap<>();

/**
* If the segment schema is fetched via SMQ, subsequently it is added here.
* The mapping is removed when the schema information is backfilled in the DB.
*/
private final ConcurrentMap<SegmentId, SchemaPayloadPlus> inTransitSMQResults = new ConcurrentHashMap<>();

private final ServiceEmitter emitter;

@Inject
public SegmentSchemaCache(ServiceEmitter emitter)
{
this.emitter = emitter;
}

/**
* Once the schema information is backfilled in the DB, it is added here.
* This map is cleared after each DB poll.
@@ -106,23 +100,29 @@ public SegmentSchemaCache(ServiceEmitter emitter)
*/
private final ConcurrentMap<SegmentId, SchemaPayloadPlus> inTransitSMQPublishedResults = new ConcurrentHashMap<>();

private final ServiceEmitter emitter;

@Inject
public SegmentSchemaCache(ServiceEmitter emitter)
{
this.emitter = emitter;
}

public void setInitialized()
{
log.info("Initializing SegmentSchemaCache.");
if (initialized.get().getCount() == 1) {
initialized.get().countDown();
log.info("SegmentSchemaCache is initialized.");
}
}

/**
* Uninitialize is called when the current node is no longer the leader.
* This method is called when the current node is no longer the leader.
* The schema is cleared except for {@code realtimeSegmentSchemaMap}.
* Schema map continues to be updated on both the leader and follower nodes.
* Realtime schema continues to be updated on both the leader and follower nodes.
*/
public void uninitialize()
public void onLeaderStop()
{
log.info("[%s] is uninitializing.", getClass().getSimpleName());
initialized.set(new CountDownLatch(1));

finalizedSegmentMetadata = ImmutableMap.of();
@@ -166,7 +166,7 @@ public void updateFinalizedSegmentSchemaReference(ImmutableMap<String, SchemaPay
*/
public void addRealtimeSegmentSchema(SegmentId segmentId, RowSignature rowSignature, long numRows)
{
realtimeSegmentSchemaMap.put(segmentId, new SchemaPayloadPlus(new SchemaPayload(rowSignature), numRows));
realtimeSegmentSchema.put(segmentId, new SchemaPayloadPlus(new SchemaPayload(rowSignature), numRows));
}

/**
@@ -211,31 +211,34 @@ public void resetInTransitSMQResultPublishedOnDBPoll()
*/
public Optional<SchemaPayloadPlus> getSchemaForSegment(SegmentId segmentId)
{
// We first look up the schema in the realtime map. This ensures that during handoff
// First look up the schema in the realtime map. This ensures that during handoff
// there is no window where segment schema is missing from the cache.
// If were to look up the finalized segment map first, during handoff it is possible
// that segment schema isn't polled yet and thus missing from the map and by the time
// we look up the schema in the realtime map, it has been removed.
if (realtimeSegmentSchemaMap.containsKey(segmentId)) {
return Optional.of(realtimeSegmentSchemaMap.get(segmentId));
SchemaPayloadPlus payloadPlus = realtimeSegmentSchema.get(segmentId);
if (payloadPlus != null) {
return Optional.of(payloadPlus);
}

// it is important to lookup {@code inTransitSMQResults} before {@code inTransitSMQPublishedResults}
// other way round, if a segment schema is just published it is possible that the schema is missing
// in {@code inTransitSMQPublishedResults} and by the time we check {@code inTransitSMQResults} it is removed.

// segment schema has been fetched via SMQ
if (inTransitSMQResults.containsKey(segmentId)) {
return Optional.of(inTransitSMQResults.get(segmentId));
payloadPlus = inTransitSMQResults.get(segmentId);
if (payloadPlus != null) {
return Optional.of(payloadPlus);
}

// segment schema has been fetched via SMQ and the schema has been published to the DB
if (inTransitSMQPublishedResults.containsKey(segmentId)) {
return Optional.of(inTransitSMQPublishedResults.get(segmentId));
payloadPlus = inTransitSMQPublishedResults.get(segmentId);
if (payloadPlus != null) {
return Optional.of(payloadPlus);
}

SegmentMetadata segmentMetadata = finalizedSegmentMetadata.get(segmentId);
// segment schema has been polled from the DB
SegmentMetadata segmentMetadata = finalizedSegmentMetadata.get(segmentId);
if (segmentMetadata != null) {
SchemaPayload schemaPayload = finalizedSegmentSchema.get(segmentMetadata.getSchemaFingerprint());
if (schemaPayload != null) {
@@ -256,7 +259,7 @@ public Optional<SchemaPayloadPlus> getSchemaForSegment(SegmentId segmentId)
*/
public boolean isSchemaCached(SegmentId segmentId)
{
return realtimeSegmentSchemaMap.containsKey(segmentId) ||
return realtimeSegmentSchema.containsKey(segmentId) ||
inTransitSMQResults.containsKey(segmentId) ||
inTransitSMQPublishedResults.containsKey(segmentId) ||
isFinalizedSegmentSchemaCached(segmentId);
@@ -276,19 +279,13 @@ private boolean isFinalizedSegmentSchemaCached(SegmentId segmentId)
*/
public boolean segmentRemoved(SegmentId segmentId)
{
if (!isSchemaCached(segmentId)) {
return false;
}
// remove the segment from all the maps
realtimeSegmentSchemaMap.remove(segmentId);
realtimeSegmentSchema.remove(segmentId);
inTransitSMQResults.remove(segmentId);
inTransitSMQPublishedResults.remove(segmentId);

// Stale schema payload from finalizedSegmentSchema is not removed
// as it could be referenced by other segments.
// Stale schema from the cache would get cleared on full schema refresh from the DB,
// which would be triggered after any stale schema is deleted from the DB.
// Also, segment is not cleared from finalizedSegmentStats since it updated on each DB poll.
// Since finalizedSegmentMetadata & finalizedSegmentSchema is updated on each DB poll,
// it is fine to not remove the segment from them.
return true;
}

@@ -297,12 +294,12 @@ public boolean segmentRemoved(SegmentId segmentId)
*/
public void realtimeSegmentRemoved(SegmentId segmentId)
{
realtimeSegmentSchemaMap.remove(segmentId);
realtimeSegmentSchema.remove(segmentId);
}

public void emitStats()
{
emitter.emit(ServiceMetricEvent.builder().setMetric("schemacache/realtime/size", realtimeSegmentSchemaMap.size()));
emitter.emit(ServiceMetricEvent.builder().setMetric("schemacache/realtime/size", realtimeSegmentSchema.size()));
emitter.emit(ServiceMetricEvent.builder().setMetric("schemacache/finalizedSegmentMetadata/size", finalizedSegmentMetadata.size()));
emitter.emit(ServiceMetricEvent.builder().setMetric("schemacache/finalizedSchemaPayload/size", finalizedSegmentSchema.size()));
emitter.emit(ServiceMetricEvent.builder().setMetric("schemacache/inTransitSMQResults/size", inTransitSMQResults.size()));
Original file line number Diff line number Diff line change
@@ -431,7 +431,7 @@ private void becomeLeader()
lookupCoordinatorManager.start();
serviceAnnouncer.announce(self);
if (coordinatorSegmentMetadataCache != null) {
coordinatorSegmentMetadataCache.leaderStart();
coordinatorSegmentMetadataCache.onLeaderStart();
}
final int startingLeaderCounter = coordLeaderSelector.localTerm();

@@ -513,7 +513,7 @@ private void stopBeingLeader()
log.info("I am no longer the leader...");

if (coordinatorSegmentMetadataCache != null) {
coordinatorSegmentMetadataCache.leaderStop();
coordinatorSegmentMetadataCache.onLeaderStop();
}
taskMaster.onLeaderStop();
serviceAnnouncer.unannounce(self);
Original file line number Diff line number Diff line change
@@ -120,7 +120,7 @@ public void tearDown() throws Exception
{
super.tearDown();
if (runningSchema != null) {
runningSchema.leaderStop();
runningSchema.onLeaderStop();
}
}

@@ -159,7 +159,7 @@ public void markDataSourceAsNeedRebuild(String datasource)
}
};

runningSchema.leaderStart();
runningSchema.onLeaderStart();
runningSchema.awaitInitialization();
return runningSchema;
}
@@ -345,7 +345,7 @@ public void refresh(
}
};

schema.leaderStart();
schema.onLeaderStart();
schema.awaitInitialization();

final Map<SegmentId, AvailableSegmentMetadata> segmentMetadatas = schema.getSegmentMetadataSnapshot();
@@ -1316,7 +1316,7 @@ void updateSchemaForRealtimeSegments(SegmentSchemas segmentSchemas)
}
};

schema.leaderStart();
schema.onLeaderStart();
schema.awaitInitialization();

AvailableSegmentMetadata availableSegmentMetadata = schema.getAvailableSegmentMetadata(DATASOURCE3, realtimeSegment1.getId());
@@ -1400,7 +1400,7 @@ public void refresh(Set<SegmentId> segmentsToRefresh, Set<String> dataSourcesToR
}
};

schema.leaderStart();
schema.onLeaderStart();
schema.awaitInitialization();
Assert.assertTrue(refresh1Latch.await(10, TimeUnit.SECONDS));

@@ -1587,7 +1587,7 @@ public void refresh(Set<SegmentId> segmentsToRefresh, Set<String> dataSourcesToR
serverView.addSegment(segment1, ServerType.HISTORICAL);
serverView.addSegment(segment2, ServerType.HISTORICAL);

schema.leaderStart();
schema.onLeaderStart();
schema.awaitInitialization();

// verify SMQ is not executed, since the schema is already cached
Original file line number Diff line number Diff line change
@@ -115,7 +115,7 @@ public void processBatchesDue()

segmentIdSchemaMap.put(segment.getId().toString(), Pair.of(new SchemaPayload(rowSignature, aggregatorFactoryMap), 20));
segmentSchemaBackFillQueue.add(segment.getId(), rowSignature, aggregatorFactoryMap, 20);
segmentSchemaBackFillQueue.leaderStart();
segmentSchemaBackFillQueue.onLeaderStart();
latch.await();
segmentSchemaTestUtils.verifySegmentSchema(segmentIdSchemaMap);
}
Loading