diff --git a/build.gradle b/build.gradle index 6e6dadb7ebfa3..9ee756d41e11e 100644 --- a/build.gradle +++ b/build.gradle @@ -56,7 +56,7 @@ buildscript { ext.hazelcastVersion = '5.3.6' ext.ebeanVersion = '15.5.2' ext.googleJavaFormatVersion = '1.18.1' - ext.openLineageVersion = '1.19.0' + ext.openLineageVersion = '1.24.2' ext.logbackClassicJava8 = '1.2.12' ext.docker_registry = 'acryldata' diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/AbstractMCLStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/AbstractMCLStep.java index 6c70aee88675c..cd7947ce3c11a 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/AbstractMCLStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/AbstractMCLStep.java @@ -1,13 +1,12 @@ package com.linkedin.datahub.upgrade.system; -import static com.linkedin.metadata.Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME; - import com.linkedin.common.urn.Urn; import com.linkedin.datahub.upgrade.UpgradeContext; import com.linkedin.datahub.upgrade.UpgradeStep; import com.linkedin.datahub.upgrade.UpgradeStepResult; import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.aspect.SystemAspect; import com.linkedin.metadata.boot.BootstrapStep; import com.linkedin.metadata.entity.AspectDao; import com.linkedin.metadata.entity.EntityService; @@ -16,10 +15,13 @@ import com.linkedin.metadata.entity.ebean.PartitionedStream; import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs; import com.linkedin.metadata.utils.AuditStampUtils; +import com.linkedin.upgrade.DataHubUpgradeResult; import com.linkedin.upgrade.DataHubUpgradeState; import com.linkedin.util.Pair; import io.datahubproject.metadata.context.OperationContext; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.function.Function; @@ -33,6 +35,8 @@ */ @Slf4j public abstract class AbstractMCLStep implements UpgradeStep { + public static final String LAST_URN_KEY = "lastUrn"; + private final OperationContext opContext; private final EntityService entityService; private final AspectDao aspectDao; @@ -70,10 +74,30 @@ protected Urn getUpgradeIdUrn() { @Override public Function executable() { return (context) -> { + // Resume state + Optional prevResult = + context.upgrade().getUpgradeResult(opContext, getUpgradeIdUrn(), entityService); + String resumeUrn = + prevResult + .filter( + result -> + DataHubUpgradeState.IN_PROGRESS.equals(result.getState()) + && result.getResult() != null + && result.getResult().containsKey(LAST_URN_KEY)) + .map(result -> result.getResult().get(LAST_URN_KEY)) + .orElse(null); + if (resumeUrn != null) { + log.info("{}: Resuming from URN: {}", getUpgradeIdUrn(), resumeUrn); + } // re-using for configuring the sql scan RestoreIndicesArgs args = - new RestoreIndicesArgs().aspectName(getAspectName()).batchSize(batchSize).limit(limit); + new RestoreIndicesArgs() + .aspectName(getAspectName()) + .batchSize(batchSize) + .lastUrn(resumeUrn) + .urnBasedPagination(resumeUrn != null) + .limit(limit); if (getUrnLike() != null) { args = args.urnLike(getUrnLike()); @@ -86,40 +110,62 @@ public Function executable() { batch -> { log.info("Processing batch({}) of size {}.", getAspectName(), batchSize); - List, Boolean>> futures; - + List, SystemAspect>> futures; futures = EntityUtils.toSystemAspectFromEbeanAspects( opContext.getRetrieverContext().get(), batch.collect(Collectors.toList())) .stream() .map( - systemAspect -> - entityService.alwaysProduceMCLAsync( - opContext, - systemAspect.getUrn(), - systemAspect.getUrn().getEntityType(), - getAspectName(), - systemAspect.getAspectSpec(), - null, - systemAspect.getRecordTemplate(), - null, - systemAspect - .getSystemMetadata() - .setRunId(id()) - .setLastObserved(System.currentTimeMillis()), - AuditStampUtils.createDefaultAuditStamp(), - ChangeType.UPSERT)) - .collect(Collectors.toList()); - - futures.forEach( - f -> { - try { - f.getFirst().get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - }); + systemAspect -> { + Pair, Boolean> future = + entityService.alwaysProduceMCLAsync( + opContext, + systemAspect.getUrn(), + systemAspect.getUrn().getEntityType(), + getAspectName(), + systemAspect.getAspectSpec(), + null, + systemAspect.getRecordTemplate(), + null, + systemAspect + .getSystemMetadata() + .setRunId(id()) + .setLastObserved(System.currentTimeMillis()), + AuditStampUtils.createDefaultAuditStamp(), + ChangeType.UPSERT); + return Pair., SystemAspect>of( + future.getFirst(), systemAspect); + }) + .toList(); + + SystemAspect lastAspect = + futures.stream() + .map( + f -> { + try { + f.getFirst().get(); + return f.getSecond(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }) + .reduce((a, b) -> b) + .orElse(null); + + // record progress + if (lastAspect != null) { + log.info( + "{}: Saving state. Last urn:{}", getUpgradeIdUrn(), lastAspect.getUrn()); + context + .upgrade() + .setUpgradeResult( + opContext, + getUpgradeIdUrn(), + entityService, + DataHubUpgradeState.IN_PROGRESS, + Map.of(LAST_URN_KEY, lastAspect.getUrn().toString())); + } if (batchDelayMs > 0) { log.info("Sleeping for {} ms", batchDelayMs); @@ -142,12 +188,23 @@ public Function executable() { @Override /** Returns whether the upgrade should be skipped. */ public boolean skip(UpgradeContext context) { - boolean previouslyRun = - entityService.exists( - opContext, getUpgradeIdUrn(), DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true); - if (previouslyRun) { - log.info("{} was already run. Skipping.", id()); + Optional prevResult = + context.upgrade().getUpgradeResult(opContext, getUpgradeIdUrn(), entityService); + + boolean previousRunFinal = + prevResult + .filter( + result -> + DataHubUpgradeState.SUCCEEDED.equals(result.getState()) + || DataHubUpgradeState.ABORTED.equals(result.getState())) + .isPresent(); + + if (previousRunFinal) { + log.info( + "{} was already run. State: {} Skipping.", + id(), + prevResult.map(DataHubUpgradeResult::getState)); } - return previouslyRun; + return previousRunFinal; } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/schemafield/GenerateSchemaFieldsFromSchemaMetadataStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/schemafield/GenerateSchemaFieldsFromSchemaMetadataStep.java index eece83f4ab713..55bc8edbf6a76 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/schemafield/GenerateSchemaFieldsFromSchemaMetadataStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/schemafield/GenerateSchemaFieldsFromSchemaMetadataStep.java @@ -1,5 +1,6 @@ package com.linkedin.datahub.upgrade.system.schemafield; +import static com.linkedin.datahub.upgrade.system.AbstractMCLStep.LAST_URN_KEY; import static com.linkedin.metadata.Constants.APP_SOURCE; import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME; import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME; @@ -61,7 +62,6 @@ */ @Slf4j public class GenerateSchemaFieldsFromSchemaMetadataStep implements UpgradeStep { - private static final String LAST_URN_KEY = "lastUrn"; private static final List REQUIRED_ASPECTS = List.of(SCHEMA_METADATA_ASPECT_NAME, STATUS_ASPECT_NAME); diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNonBlockingTest.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNonBlockingTest.java index f340e688ad7f7..21bc6b725cba2 100644 --- a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNonBlockingTest.java +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNonBlockingTest.java @@ -1,14 +1,18 @@ package com.linkedin.datahub.upgrade; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.AssertJUnit.assertNotNull; +import com.linkedin.data.template.StringMap; import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager; import com.linkedin.datahub.upgrade.system.SystemUpdateNonBlocking; import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCPStep; @@ -20,17 +24,30 @@ import com.linkedin.metadata.entity.AspectDao; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.EntityServiceImpl; +import com.linkedin.metadata.entity.ebean.EbeanAspectV2; +import com.linkedin.metadata.entity.ebean.PartitionedStream; import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs; import com.linkedin.mxe.Topics; +import com.linkedin.upgrade.DataHubUpgradeResult; +import com.linkedin.upgrade.DataHubUpgradeState; +import com.linkedin.util.Pair; import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.test.metadata.context.TestOperationContexts; +import java.sql.Timestamp; +import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.inject.Named; +import org.mockito.ArgumentCaptor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @ActiveProfiles("test") @@ -63,7 +80,12 @@ public class DatahubUpgradeNonBlockingTest extends AbstractTestNGSpringContextTe @Autowired private EntityServiceImpl entityService; - @Autowired private OperationContext opContext; + private OperationContext opContext; + + @BeforeClass + public void init() { + opContext = TestOperationContexts.systemContextNoValidate(); + } @Test public void testSystemUpdateNonBlockingInit() { @@ -81,10 +103,13 @@ public void testSystemUpdateNonBlockingInit() { } @Test - public void testReindexDataJobViaNodesCLLPaging() { + public void testReindexDataJobViaNodesCLLPagingArgs() { EntityService mockService = mock(EntityService.class); AspectDao mockAspectDao = mock(AspectDao.class); + PartitionedStream mockStream = mock(PartitionedStream.class); + when(mockStream.partition(anyInt())).thenReturn(Stream.empty()); + when(mockAspectDao.streamAspectBatches(any(RestoreIndicesArgs.class))).thenReturn(mockStream); ReindexDataJobViaNodesCLL cllUpgrade = new ReindexDataJobViaNodesCLL(opContext, mockService, mockAspectDao, true, 10, 0, 0); @@ -102,9 +127,79 @@ public void testReindexDataJobViaNodesCLLPaging() { .batchSize(10) .limit(0) .aspectName("dataJobInputOutput") + .urnBasedPagination(false) + .lastUrn(null) .urnLike("urn:li:dataJob:%"))); } + @Test + public void testReindexDataJobViaNodesCLLResumePaging() throws Exception { + // Mock services + EntityService mockService = mock(EntityService.class); + AspectDao mockAspectDao = mock(AspectDao.class); + + // Create test data + EbeanAspectV2 aspect1 = createMockEbeanAspect("urn:li:dataJob:job1", "dataJobInputOutput"); + EbeanAspectV2 aspect2 = createMockEbeanAspect("urn:li:dataJob:job2", "dataJobInputOutput"); + EbeanAspectV2 aspect3 = createMockEbeanAspect("urn:li:dataJob:job3", "dataJobInputOutput"); + List initialBatch = Arrays.asList(aspect1, aspect2); + List resumeBatch = Arrays.asList(aspect3); + + // Mock the stream for first batch + PartitionedStream initialStream = mock(PartitionedStream.class); + when(initialStream.partition(anyInt())).thenReturn(Stream.of(initialBatch.stream())); + + // Mock the stream for second batch + PartitionedStream resumeStream = mock(PartitionedStream.class); + when(resumeStream.partition(anyInt())).thenReturn(Stream.of(resumeBatch.stream())); + + // Setup the AspectDao using Answer to handle null safely + when(mockAspectDao.streamAspectBatches(any(RestoreIndicesArgs.class))) + .thenAnswer( + invocation -> { + RestoreIndicesArgs args = invocation.getArgument(0); + if (args.lastUrn() == null) { + return initialStream; + } else if ("urn:li:dataJob:job2".equals(args.lastUrn())) { + return resumeStream; + } + return mock(PartitionedStream.class); + }); + + // Mock successful MCL production + when(mockService.alwaysProduceMCLAsync( + any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any())) + .thenReturn(Pair.of(CompletableFuture.completedFuture(null), true)); + + // Create the upgrade + ReindexDataJobViaNodesCLL cllUpgrade = + new ReindexDataJobViaNodesCLL(opContext, mockService, mockAspectDao, true, 2, 0, 0); + + // Initial Run + cllUpgrade.steps().get(0).executable().apply(createMockInitialUpgrade()); + + // Resumed + cllUpgrade.steps().get(0).executable().apply(createMockResumeUpgrade()); + + // Use ArgumentCaptor to verify the calls + ArgumentCaptor argsCaptor = + ArgumentCaptor.forClass(RestoreIndicesArgs.class); + verify(mockAspectDao, times(2)).streamAspectBatches(argsCaptor.capture()); + + List capturedArgs = argsCaptor.getAllValues(); + + // Verify both the initial and resume calls were made with correct arguments + assertEquals(capturedArgs.get(0).lastUrn(), null); + assertEquals(capturedArgs.get(0).urnBasedPagination(), false); + assertEquals(capturedArgs.get(1).lastUrn(), "urn:li:dataJob:job2"); + assertEquals(capturedArgs.get(1).urnBasedPagination(), true); + + // Verify MCL production was called for each aspect + verify(mockService, times(3)) + .alwaysProduceMCLAsync( + any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any()); + } + @Test public void testNonBlockingBootstrapMCP() { List mcpTemplate = @@ -123,4 +218,54 @@ public void testNonBlockingBootstrapMCP() { .map(update -> update.getMcpTemplate().getName()) .collect(Collectors.toSet()))); } + + private UpgradeContext createMockInitialUpgrade() { + // Mock the Upgrade instance + Upgrade mockUpgrade = mock(Upgrade.class); + + // Configure the mock upgrade to return no previous result + when(mockUpgrade.getUpgradeResult(any(), any(), any())).thenReturn(Optional.empty()); + + UpgradeContext mockInitialContext = mock(UpgradeContext.class); + when(mockInitialContext.opContext()).thenReturn(opContext); + when(mockInitialContext.upgrade()).thenReturn(mockUpgrade); + when(mockInitialContext.report()).thenReturn(mock(UpgradeReport.class)); + + return mockInitialContext; + } + + private UpgradeContext createMockResumeUpgrade() { + // Mock the Upgrade instance + Upgrade mockUpgrade = mock(Upgrade.class); + DataHubUpgradeResult mockPrevResult = mock(DataHubUpgradeResult.class); + + // Configure the mock previous result + when(mockPrevResult.getState()).thenReturn(DataHubUpgradeState.IN_PROGRESS); + when(mockPrevResult.getResult()) + .thenReturn(new StringMap(Map.of("lastUrn", "urn:li:dataJob:job2"))); + + // Configure the mock upgrade to return our previous result + when(mockUpgrade.getUpgradeResult(any(), any(), any())).thenReturn(Optional.of(mockPrevResult)); + + UpgradeContext mockResumeContext = mock(UpgradeContext.class); + when(mockResumeContext.opContext()).thenReturn(opContext); + when(mockResumeContext.upgrade()).thenReturn(mockUpgrade); + when(mockResumeContext.report()).thenReturn(mock(UpgradeReport.class)); + + return mockResumeContext; + } + + private static EbeanAspectV2 createMockEbeanAspect(String urn, String aspectName) { + Timestamp now = new Timestamp(System.currentTimeMillis()); + return new EbeanAspectV2( + urn, + aspectName, + 0L, + "{}", // metadata + now, // createdOn + "urn:li:corpuser:testUser", // createdBy + null, // createdFor + null // systemMetadata + ); + } } diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java index 81d883d8ce36b..5b7b8756f11fb 100644 --- a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java +++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java @@ -19,17 +19,17 @@ @Import(value = {SystemAuthenticationFactory.class}) public class UpgradeCliApplicationTestConfiguration { - @MockBean private UpgradeCli upgradeCli; + @MockBean public UpgradeCli upgradeCli; - @MockBean private Database ebeanServer; + @MockBean public Database ebeanServer; - @MockBean private SearchService searchService; + @MockBean public SearchService searchService; - @MockBean private GraphService graphService; + @MockBean public GraphService graphService; - @MockBean private EntityRegistry entityRegistry; + @MockBean public EntityRegistry entityRegistry; - @MockBean ConfigEntityRegistry configEntityRegistry; + @MockBean public ConfigEntityRegistry configEntityRegistry; @MockBean public EntityIndexBuilders entityIndexBuilders; diff --git a/datahub-web-react/src/app/entity/shared/siblingUtils.ts b/datahub-web-react/src/app/entity/shared/siblingUtils.ts index 2f50dc99df191..aa9e4bcb5e46e 100644 --- a/datahub-web-react/src/app/entity/shared/siblingUtils.ts +++ b/datahub-web-react/src/app/entity/shared/siblingUtils.ts @@ -5,6 +5,7 @@ import * as QueryString from 'query-string'; import { Dataset, Entity, Maybe, SiblingProperties } from '../../../types.generated'; import { GenericEntityProperties } from './types'; import { useIsShowSeparateSiblingsEnabled } from '../../useAppConfig'; +import { downgradeV2FieldPath } from '../dataset/profile/schema/utils/utils'; export function stripSiblingsFromEntity(entity: any) { return { @@ -55,16 +56,30 @@ const combineMerge = (target, source, options) => { return destination; }; -function convertObjectKeysToLowercase(object: Record) { - return Object.fromEntries(Object.entries(object).map(([key, value]) => [key.toLowerCase(), value])); +// this function is responsible for normalizing object keys to make sure merging on key matches keys appropriately +function normalizeObjectKeys(object: Record, isSchemaField = false) { + return Object.fromEntries( + Object.entries(object).map(([key, value]) => { + let normalizedKey = key.toLowerCase(); + if (isSchemaField) { + normalizedKey = downgradeV2FieldPath(normalizedKey) || normalizedKey; + } + return [normalizedKey, value]; + }), + ); } // use when you want to merge an array of objects by key in the object as opposed to by index of array -const mergeArrayOfObjectsByKey = (destinationArray: any[], sourceArray: any[], key: string) => { - const destination = convertObjectKeysToLowercase(keyBy(destinationArray, key)); - const source = convertObjectKeysToLowercase(keyBy(sourceArray, key)); +const mergeArrayOfObjectsByKey = (destinationArray: any[], sourceArray: any[], key: string, isSchemaField = false) => { + const destination = normalizeObjectKeys(keyBy(destinationArray, key), isSchemaField); + const source = normalizeObjectKeys(keyBy(sourceArray, key), isSchemaField); - return values(merge(destination, source)); + return values( + merge(destination, source, { + arrayMerge: combineMerge, + customMerge, + }), + ); }; const mergeTags = (destinationArray, sourceArray, _options) => { @@ -88,7 +103,7 @@ const mergeOwners = (destinationArray, sourceArray, _options) => { }; const mergeFields = (destinationArray, sourceArray, _options) => { - return mergeArrayOfObjectsByKey(destinationArray, sourceArray, 'fieldPath'); + return mergeArrayOfObjectsByKey(destinationArray, sourceArray, 'fieldPath', true); }; function getArrayMergeFunction(key) { @@ -112,7 +127,7 @@ function getArrayMergeFunction(key) { } } -const customMerge = (isPrimary, key) => { +function customMerge(isPrimary, key) { if (key === 'upstream' || key === 'downstream') { return (_secondary, primary) => primary; } @@ -145,7 +160,7 @@ const customMerge = (isPrimary, key) => { customMerge: customMerge.bind({}, isPrimary), }); }; -}; +} export const getEntitySiblingData = (baseEntity: T): Maybe => { if (!baseEntity) { diff --git a/docker/profiles/docker-compose.gms.yml b/docker/profiles/docker-compose.gms.yml index 147bbd35ff646..824c8024b05d6 100644 --- a/docker/profiles/docker-compose.gms.yml +++ b/docker/profiles/docker-compose.gms.yml @@ -40,7 +40,6 @@ x-kafka-env: &kafka-env # KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081 SCHEMA_REGISTRY_TYPE: INTERNAL KAFKA_SCHEMAREGISTRY_URL: http://datahub-gms:8080/schema-registry/api/ - SPRING_KAFKA_CONSUMER_AUTO_OFFSET_RESET: ${SPRING_KAFKA_CONSUMER_AUTO_OFFSET_RESET:-earliest} x-datahub-quickstart-telemetry-env: &datahub-quickstart-telemetry-env DATAHUB_SERVER_TYPE: ${DATAHUB_SERVER_TYPE:-quickstart} diff --git a/docs/api/tutorials/structured-properties.md b/docs/api/tutorials/structured-properties.md index 9b18aa922290b..b606ce9a8e245 100644 --- a/docs/api/tutorials/structured-properties.md +++ b/docs/api/tutorials/structured-properties.md @@ -66,7 +66,7 @@ mutation createStructuredProperty { qualifiedName:"retentionTime", displayName: "Retention Time", description: "Retention Time is used to figure out how long to retain records in a dataset", - valueType: "urn:li:dataType:number", + valueType: "urn:li:dataType:datahub.number", allowedValues: [ {numberValue: 30, description: "30 days, usually reserved for datasets that are ephemeral and contain pii"}, {numberValue: 90, description:"description: Use this for datasets that drive monthly reporting but contain pii"}, diff --git a/docs/managed-datahub/release-notes/v_0_3_7.md b/docs/managed-datahub/release-notes/v_0_3_7.md index 59b7a23b5e583..19cb04e9f5603 100644 --- a/docs/managed-datahub/release-notes/v_0_3_7.md +++ b/docs/managed-datahub/release-notes/v_0_3_7.md @@ -11,6 +11,11 @@ Recommended CLI/SDK If you are using an older CLI/SDK version, then please upgrade it. This applies for all CLI/SDK usages, if you are using it through your terminal, GitHub Actions, Airflow, in Python SDK somewhere, Java SDK, etc. This is a strong recommendation to upgrade, as we keep on pushing fixes in the CLI, and it helps us support you better. +## Known Issues + +### v0.3.7.3 + * Search page fails to render when filters are applied with a query which returns zero results. + ## Release Changelog --- diff --git a/entity-registry/build.gradle b/entity-registry/build.gradle index 2dedea1f16d99..22e5b601d39db 100644 --- a/entity-registry/build.gradle +++ b/entity-registry/build.gradle @@ -25,6 +25,8 @@ dependencies { because("previous versions are vulnerable to CVE-2022-25857") } } + api project(path: ':li-utils') + api project(path: ':li-utils', configuration: "dataTemplate") dataModel project(':li-utils') annotationProcessor externalDependency.lombok diff --git a/li-utils/src/main/javaPegasus/com/linkedin/common/urn/UrnUtils.java b/li-utils/src/main/javaPegasus/com/linkedin/common/urn/UrnUtils.java index 89f0cd8fbc979..0a2400badfc62 100644 --- a/li-utils/src/main/javaPegasus/com/linkedin/common/urn/UrnUtils.java +++ b/li-utils/src/main/javaPegasus/com/linkedin/common/urn/UrnUtils.java @@ -27,28 +27,6 @@ public static DatasetUrn toDatasetUrn( new DataPlatformUrn(platformName), datasetName, FabricType.valueOf(origin.toUpperCase())); } - /** - * Convert fabric String to FabricType - * - * @param fabric PROD, CORP, EI, DEV, LIT, PRIME - * @return FabricType - */ - @Nonnull - public static FabricType toFabricType(@Nonnull String fabric) { - switch (fabric.toUpperCase()) { - case "PROD": - return FabricType.PROD; - case "CORP": - return FabricType.CORP; - case "EI": - return FabricType.EI; - case "DEV": - return FabricType.DEV; - default: - throw new IllegalArgumentException("Unsupported Fabric Type: " + fabric); - } - } - public static Urn getUrn(String urnStr) { try { return Urn.createFromString(urnStr); diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 3152d0290ec22..2469af74b0334 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -245,7 +245,7 @@ # Instead, we put the fix in our PyHive fork, so no thrift pin is needed. } -microsoft_common = {"msal>=1.22.0"} +microsoft_common = {"msal>=1.24.0"} iceberg_common = { # Iceberg Python SDK diff --git a/metadata-ingestion/src/datahub/cli/check_cli.py b/metadata-ingestion/src/datahub/cli/check_cli.py index 39ed1b2bfea08..fbe07b64f0e15 100644 --- a/metadata-ingestion/src/datahub/cli/check_cli.py +++ b/metadata-ingestion/src/datahub/cli/check_cli.py @@ -268,7 +268,9 @@ def sql_lineage( ) logger.debug("Sql parsing debug info: %s", lineage.debug_info) - if lineage.debug_info.error: + if lineage.debug_info.table_error: + raise lineage.debug_info.table_error + elif lineage.debug_info.error: logger.debug("Sql parsing error details", exc_info=lineage.debug_info.error) click.echo(lineage.json(indent=4)) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py index b18b526ef30fc..71a20890d35e8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/query.py @@ -9,6 +9,8 @@ # We use 290 instead instead of the standard 320, because escape characters can add to the length. _QUERY_SEQUENCE_LIMIT = 290 +_MAX_COPY_ENTRIES_PER_TABLE = 20 + class RedshiftCommonQuery: CREATE_TEMP_TABLE_CLAUSE = "create temp table" @@ -293,28 +295,37 @@ def alter_table_rename_query( def list_copy_commands_sql( db_name: str, start_time: datetime, end_time: datetime ) -> str: - return """ - select - distinct - "schema" as target_schema, - "table" as target_table, - c.file_name as filename - from - SYS_QUERY_DETAIL as si - join SYS_LOAD_DETAIL as c on - si.query_id = c.query_id - join SVV_TABLE_INFO sti on - sti.table_id = si.table_id - where - database = '{db_name}' - and si.start_time >= '{start_time}' - and si.start_time < '{end_time}' - order by target_schema, target_table, si.start_time asc - """.format( + return """\ +SELECT DISTINCT + target_schema, + target_table, + filename +FROM ( + SELECT + sti."schema" AS target_schema, + sti."table" AS target_table, + c.file_name AS filename, + ROW_NUMBER() OVER ( + PARTITION BY sti."schema", sti."table" + ORDER BY si.start_time DESC + ) AS rn + FROM + SYS_QUERY_DETAIL AS si + JOIN SYS_LOAD_DETAIL AS c ON si.query_id = c.query_id + JOIN SVV_TABLE_INFO sti ON sti.table_id = si.table_id + WHERE + sti.database = '{db_name}' + AND si.start_time >= '{start_time}' + AND si.start_time < '{end_time}' +) subquery +WHERE rn <= {_MAX_COPY_ENTRIES_PER_TABLE} +ORDER BY target_schema, target_table, filename +""".format( # We need the original database name for filtering db_name=db_name, start_time=start_time.strftime(redshift_datetime_format), end_time=end_time.strftime(redshift_datetime_format), + _MAX_COPY_ENTRIES_PER_TABLE=_MAX_COPY_ENTRIES_PER_TABLE, ) @staticmethod diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py index b635f8cb47b6d..506bd1d8c6be4 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py @@ -904,6 +904,15 @@ def _sqlglot_lineage_inner( logger.debug("Parsing lineage from sql statement: %s", sql) statement = parse_statement(sql, dialect=dialect) + if isinstance(statement, sqlglot.exp.Command): + # For unsupported syntax, sqlglot will usually fallback to parsing as a Command. + # This is effectively a parsing error, and we won't get any lineage from it. + # See https://github.com/tobymao/sqlglot/commit/3a13fdf4e597a2f0a3f9fc126a129183fe98262f + # and https://github.com/tobymao/sqlglot/pull/2874 + raise UnsupportedStatementTypeError( + f"Got unsupported syntax for statement: {sql}" + ) + original_statement, statement = statement, statement.copy() # logger.debug( # "Formatted sql statement: %s", diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_sqlite_attach_database.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_sqlite_attach_database.json new file mode 100644 index 0000000000000..bcf31f6be803a --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_sqlite_attach_database.json @@ -0,0 +1,12 @@ +{ + "query_type": "UNKNOWN", + "query_type_props": {}, + "query_fingerprint": null, + "in_tables": [], + "out_tables": [], + "column_lineage": null, + "debug_info": { + "confidence": 0.0, + "generalized_statement": null + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py index 90cc863d6bd23..170341230205f 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py @@ -1268,3 +1268,14 @@ def test_bigquery_subquery_column_inference() -> None: dialect="bigquery", expected_file=RESOURCE_DIR / "test_bigquery_subquery_column_inference.json", ) + + +def test_sqlite_attach_database() -> None: + assert_sql_result( + """\ +ATTACH DATABASE ':memory:' AS aux1 +""", + dialect="sqlite", + expected_file=RESOURCE_DIR / "test_sqlite_attach_database.json", + allow_table_error=True, + ) diff --git a/metadata-integration/java/acryl-spark-lineage/README.md b/metadata-integration/java/acryl-spark-lineage/README.md index bd0a58b635b48..267e979b0fa07 100644 --- a/metadata-integration/java/acryl-spark-lineage/README.md +++ b/metadata-integration/java/acryl-spark-lineage/README.md @@ -165,6 +165,7 @@ information like tokens. | spark.datahub.rest.server | | http://localhost:8080 | Datahub server url eg: | | spark.datahub.rest.token | | | Authentication token. | | spark.datahub.rest.disable_ssl_verification | | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! | +| spark.datahub.rest.disable_chunked_encoding | | false | Disable Chunked Transfer Encoding. In some environment chunked encoding causes issues. With this config option it can be disabled. || | spark.datahub.rest.max_retries | | 0 | Number of times a request retried if failed | | spark.datahub.rest.retry_interval | | 10 | Number of seconds to wait between retries | | spark.datahub.file.filename | | | The file where metadata will be written if file emitter is set | diff --git a/metadata-integration/java/acryl-spark-lineage/build.gradle b/metadata-integration/java/acryl-spark-lineage/build.gradle index 6620c34021ac4..3f83e5657bbf4 100644 --- a/metadata-integration/java/acryl-spark-lineage/build.gradle +++ b/metadata-integration/java/acryl-spark-lineage/build.gradle @@ -1,7 +1,7 @@ plugins { id("com.palantir.git-version") apply false } -apply plugin: 'java' +apply plugin: 'java-library' apply plugin: 'com.github.johnrengelman.shadow' apply plugin: 'signing' apply plugin: 'io.codearte.nexus-staging' @@ -51,8 +51,8 @@ dependencies { implementation project(':metadata-integration:java:openlineage-converter') - implementation project(path: ':metadata-integration:java:datahub-client', configuration: 'shadow') - implementation project(path: ':metadata-integration:java:openlineage-converter', configuration: 'shadow') + implementation project(path: ':metadata-integration:java:datahub-client') + implementation project(path: ':metadata-integration:java:openlineage-converter') //implementation "io.acryl:datahub-client:0.10.2" implementation "io.openlineage:openlineage-spark_2.12:$openLineageVersion" @@ -91,6 +91,8 @@ shadowJar { zip64 = true archiveClassifier = '' mergeServiceFiles() + project.configurations.implementation.canBeResolved = true + configurations = [project.configurations.implementation] def exclude_modules = project .configurations @@ -106,6 +108,8 @@ shadowJar { exclude(dependency { exclude_modules.contains(it.name) }) + exclude(dependency("org.slf4j::")) + exclude("org/apache/commons/logging/**") } // preventing java multi-release JAR leakage @@ -123,39 +127,36 @@ shadowJar { relocate 'com.sun.activation', 'io.acryl.shaded.com.sun.activation' relocate 'com.sun.codemodel', 'io.acryl.shaded.com.sun.codemodel' relocate 'com.sun.mail', 'io.acryl.shaded.com.sun.mail' - relocate 'com.fasterxml.jackson', 'datahub.spark2.shaded.jackson' - relocate 'org.slf4j', 'datahub.spark2.shaded.org.slf4j' // relocate 'org.apache.hc', 'io.acryl.shaded.http' - relocate 'org.apache.commons.codec', 'datahub.spark2.shaded.o.a.c.codec' - relocate 'org.apache.commons.compress', 'datahub.spark2.shaded.o.a.c.compress' - relocate 'org.apache.commons.lang3', 'datahub.spark2.shaded.o.a.c.lang3' + relocate 'org.apache.commons.codec', 'io.acryl.shaded.org.apache.commons.codec' + relocate 'org.apache.commons.compress', 'io.acryl.shaded.org.apache.commons.compress' + relocate 'org.apache.commons.lang3', 'io.acryl.shaded.org.apache.commons.lang3' relocate 'mozilla', 'datahub.spark2.shaded.mozilla' - relocate 'com.typesafe', 'datahub.spark2.shaded.typesafe' - relocate 'io.opentracing', 'datahub.spark2.shaded.io.opentracing' - relocate 'io.netty', 'datahub.spark2.shaded.io.netty' - relocate 'ch.randelshofer', 'datahub.spark2.shaded.ch.randelshofer' - relocate 'ch.qos', 'datahub.spark2.shaded.ch.qos' + relocate 'com.typesafe', 'io.acryl.shaded.com.typesafe' + relocate 'io.opentracing', 'io.acryl.shaded.io.opentracing' + relocate 'io.netty', 'io.acryl.shaded.io.netty' + relocate 'ch.randelshofer', 'io.acryl.shaded.ch.randelshofer' + relocate 'ch.qos', 'io.acryl.shaded.ch.qos' relocate 'org.springframework', 'io.acryl.shaded.org.springframework' relocate 'com.fasterxml.jackson', 'io.acryl.shaded.jackson' relocate 'org.yaml', 'io.acryl.shaded.org.yaml' // Required for shading snakeyaml relocate 'net.jcip.annotations', 'io.acryl.shaded.annotations' relocate 'javassist', 'io.acryl.shaded.javassist' relocate 'edu.umd.cs.findbugs', 'io.acryl.shaded.findbugs' - relocate 'org.antlr', 'io.acryl.shaded.org.antlr' - relocate 'antlr', 'io.acryl.shaded.antlr' + //relocate 'org.antlr', 'io.acryl.shaded.org.antlr' + //relocate 'antlr', 'io.acryl.shaded.antlr' relocate 'com.google.common', 'io.acryl.shaded.com.google.common' - relocate 'org.apache.commons', 'io.acryl.shaded.org.apache.commons' relocate 'org.reflections', 'io.acryl.shaded.org.reflections' relocate 'st4hidden', 'io.acryl.shaded.st4hidden' relocate 'org.stringtemplate', 'io.acryl.shaded.org.stringtemplate' relocate 'org.abego.treelayout', 'io.acryl.shaded.treelayout' - relocate 'org.slf4j', 'io.acryl.shaded.slf4j' relocate 'javax.annotation', 'io.acryl.shaded.javax.annotation' relocate 'com.github.benmanes.caffeine', 'io.acryl.shaded.com.github.benmanes.caffeine' relocate 'org.checkerframework', 'io.acryl.shaded.org.checkerframework' relocate 'com.google.errorprone', 'io.acryl.shaded.com.google.errorprone' relocate 'com.sun.jna', 'io.acryl.shaded.com.sun.jna' + } checkShadowJar { diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java index ee0938edb5045..b594f6bae954f 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java @@ -120,7 +120,9 @@ public Optional initializeEmitter(Config sparkConf) { boolean disableSslVerification = sparkConf.hasPath(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY) && sparkConf.getBoolean(SparkConfigParser.DISABLE_SSL_VERIFICATION_KEY); - + boolean disableChunkedEncoding = + sparkConf.hasPath(SparkConfigParser.REST_DISABLE_CHUNKED_ENCODING) + && sparkConf.getBoolean(SparkConfigParser.REST_DISABLE_CHUNKED_ENCODING); int retry_interval_in_sec = sparkConf.hasPath(SparkConfigParser.RETRY_INTERVAL_IN_SEC) ? sparkConf.getInt(SparkConfigParser.RETRY_INTERVAL_IN_SEC) @@ -150,6 +152,7 @@ public Optional initializeEmitter(Config sparkConf) { .disableSslVerification(disableSslVerification) .maxRetries(max_retries) .retryIntervalSec(retry_interval_in_sec) + .disableChunkedEncoding(disableChunkedEncoding) .build(); return Optional.of(new RestDatahubEmitterConfig(restEmitterConf)); case "kafka": @@ -374,7 +377,8 @@ private static void initializeMetrics(OpenLineageConfig openLineageConfig) { String disabledFacets; if (openLineageConfig.getFacetsConfig() != null && openLineageConfig.getFacetsConfig().getDisabledFacets() != null) { - disabledFacets = String.join(";", openLineageConfig.getFacetsConfig().getDisabledFacets()); + disabledFacets = + String.join(";", openLineageConfig.getFacetsConfig().getEffectiveDisabledFacets()); } else { disabledFacets = ""; } diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java index 45ec5365d09b3..3860285083c4b 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/conf/SparkConfigParser.java @@ -30,6 +30,8 @@ public class SparkConfigParser { public static final String GMS_AUTH_TOKEN = "rest.token"; public static final String FILE_EMITTER_FILE_NAME = "file.filename"; public static final String DISABLE_SSL_VERIFICATION_KEY = "rest.disable_ssl_verification"; + public static final String REST_DISABLE_CHUNKED_ENCODING = "rest.disable_chunked_encoding"; + public static final String MAX_RETRIES = "rest.max_retries"; public static final String RETRY_INTERVAL_IN_SEC = "rest.retry_interval_in_sec"; public static final String KAFKA_MCP_TOPIC = "kafka.mcp_topic"; diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java index d46d741d155b8..5f87df2a65d6c 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java @@ -5,14 +5,13 @@ package io.openlineage.spark.agent.util; -import static io.openlineage.spark.agent.lifecycle.ExecutionContext.CAMEL_TO_SNAKE_CASE; - import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import datahub.spark.conf.SparkLineageConf; import io.datahubproject.openlineage.dataset.HdfsPathDataset; import io.openlineage.client.OpenLineage; import io.openlineage.spark.agent.Versions; +import io.openlineage.spark.api.naming.NameNormalizer; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -21,7 +20,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.Objects; import java.util.Optional; import java.util.UUID; @@ -186,7 +184,7 @@ public static OpenLineage.ParentRunFacet parentRunFacet( .run(new OpenLineage.ParentRunFacetRunBuilder().runId(parentRunId).build()) .job( new OpenLineage.ParentRunFacetJobBuilder() - .name(parentJob.replaceAll(CAMEL_TO_SNAKE_CASE, "_$1").toLowerCase(Locale.ROOT)) + .name(NameNormalizer.normalize(parentJob)) .namespace(parentJobNamespace) .build()) .build(); @@ -287,8 +285,6 @@ public static boolean safeIsDefinedAt(PartialFunction pfn, Object x) { * @param pfn * @param x * @return - * @param - * @param */ public static List safeApply(PartialFunction> pfn, D x) { try { diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java index 62005bf15f850..6ef7403362a90 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java @@ -7,6 +7,7 @@ import java.util.Arrays; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.reflect.FieldUtils; @@ -18,6 +19,7 @@ import org.apache.spark.rdd.MapPartitionsRDD; import org.apache.spark.rdd.ParallelCollectionRDD; import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.execution.datasources.FilePartition; import org.apache.spark.sql.execution.datasources.FileScanRDD; import scala.Tuple2; import scala.collection.immutable.Seq; @@ -90,7 +92,7 @@ public boolean isDefinedAt(Object rdd) { @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") public Stream extract(FileScanRDD rdd) { return ScalaConversionUtils.fromSeq(rdd.filePartitions()).stream() - .flatMap(fp -> Arrays.stream(fp.files())) + .flatMap((FilePartition fp) -> Arrays.stream(fp.files())) .map( f -> { if ("3.4".compareTo(package$.MODULE$.SPARK_VERSION()) <= 0) { @@ -115,11 +117,15 @@ public boolean isDefinedAt(Object rdd) { @Override public Stream extract(ParallelCollectionRDD rdd) { + int SEQ_LIMIT = 1000; + AtomicBoolean loggingDone = new AtomicBoolean(false); try { Object data = FieldUtils.readField(rdd, "data", true); log.debug("ParallelCollectionRDD data: {}", data); - if (data instanceof Seq) { - return ScalaConversionUtils.fromSeq((Seq) data).stream() + if ((data instanceof Seq) && ((Seq) data).head() instanceof Tuple2) { + // exit if the first element is invalid + Seq data_slice = (Seq) ((Seq) data).slice(0, SEQ_LIMIT); + return ScalaConversionUtils.fromSeq(data_slice).stream() .map( el -> { Path path = null; @@ -127,9 +133,9 @@ public Stream extract(ParallelCollectionRDD rdd) { // we're able to extract path path = parentOf(((Tuple2) el)._1.toString()); log.debug("Found input {}", path); - } else { - // Change to debug to silence error - log.debug("unable to extract Path from {}", el.getClass().getCanonicalName()); + } else if (!loggingDone.get()) { + log.warn("unable to extract Path from {}", el.getClass().getCanonicalName()); + loggingDone.set(true); } return path; }) diff --git a/metadata-integration/java/datahub-client/build.gradle b/metadata-integration/java/datahub-client/build.gradle index d9087347e1b5c..1bdc848d0385b 100644 --- a/metadata-integration/java/datahub-client/build.gradle +++ b/metadata-integration/java/datahub-client/build.gradle @@ -1,6 +1,6 @@ plugins { id("com.palantir.git-version") apply false - id 'java' + id 'java-library' id 'com.github.johnrengelman.shadow' id 'jacoco' id 'signing' @@ -12,11 +12,13 @@ apply from: "../versioning.gradle" import org.apache.tools.ant.filters.ReplaceTokens -jar.enabled = false // Since we only want to build shadow jars, disabling the regular jar creation +jar { + archiveClassifier = "lib" +} dependencies { - implementation project(':entity-registry') - implementation project(':metadata-integration:java:datahub-event') + api project(':entity-registry') + api project(':metadata-integration:java:datahub-event') implementation(externalDependency.kafkaAvroSerializer) { exclude group: "org.apache.avro" } @@ -33,7 +35,7 @@ dependencies { implementation externalDependency.jacksonDataBind runtimeOnly externalDependency.jna - implementation externalDependency.slf4jApi + api externalDependency.slf4jApi compileOnly externalDependency.lombok annotationProcessor externalDependency.lombok // VisibleForTesting @@ -78,6 +80,11 @@ shadowJar { // https://github.com/johnrengelman/shadow/issues/729 exclude('module-info.class', 'META-INF/versions/**', '**/LICENSE', '**/LICENSE*.txt', '**/NOTICE', '**/NOTICE.txt', 'licenses/**', 'log4j2.*', 'log4j.*') + dependencies { + exclude(dependency("org.slf4j::")) + exclude(dependency("antlr::")) + exclude("org/apache/commons/logging/**") + } mergeServiceFiles() // we relocate namespaces manually, because we want to know exactly which libs we are exposing and why // we can move to automatic relocation using ConfigureShadowRelocation after we get to a good place on these first @@ -88,15 +95,20 @@ shadowJar { relocate 'javassist', 'datahub.shaded.javassist' relocate 'edu.umd.cs.findbugs', 'datahub.shaded.findbugs' relocate 'org.antlr', 'datahub.shaded.org.antlr' - relocate 'antlr', 'datahub.shaded.antlr' + //relocate 'antlr', 'datahub.shaded.antlr' relocate 'com.google.common', 'datahub.shaded.com.google.common' - relocate 'org.apache.commons', 'datahub.shaded.org.apache.commons' + relocate 'org.apache.commons.codec', 'datahub.shaded.org.apache.commons.codec' + relocate 'org.apache.commons.compress', 'datahub.shaded.org.apache.commons.compress' + relocate 'org.apache.commons.lang3', 'datahub.shaded.org.apache.commons.lang3' + relocate 'org.apache.commons.lang', 'datahub.shaded.org.apache.commons.lang' + relocate 'org.apache.commons.cli', 'datahub.shaded.org.apache.commons.cli' + relocate 'org.apache.commons.text', 'datahub.shaded.org.apache.commons.text' + relocate 'org.apache.commons.io', 'datahub.shaded.org.apache.commons.io' relocate 'org.apache.maven', 'datahub.shaded.org.apache.maven' relocate 'org.reflections', 'datahub.shaded.org.reflections' relocate 'st4hidden', 'datahub.shaded.st4hidden' relocate 'org.stringtemplate', 'datahub.shaded.org.stringtemplate' relocate 'org.abego.treelayout', 'datahub.shaded.treelayout' - relocate 'org.slf4j', 'datahub.shaded.slf4j' relocate 'javax.annotation', 'datahub.shaded.javax.annotation' relocate 'com.github.benmanes.caffeine', 'datahub.shaded.com.github.benmanes.caffeine' relocate 'org.checkerframework', 'datahub.shaded.org.checkerframework' diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/DatahubHttpRequestRetryStrategy.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/DatahubHttpRequestRetryStrategy.java index 71a4b93baf48f..50c0277c98b03 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/DatahubHttpRequestRetryStrategy.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/DatahubHttpRequestRetryStrategy.java @@ -48,7 +48,6 @@ public boolean retryRequest( @Override public boolean retryRequest(HttpResponse response, int execCount, HttpContext context) { - log.warn("Retrying request due to error: {}", response); return super.retryRequest(response, execCount, context); } } diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java index e1017372be124..d70c5baf10879 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitter.java @@ -1,6 +1,7 @@ package datahub.client.rest; import static com.linkedin.metadata.Constants.*; +import static org.apache.hc.core5.http.HttpHeaders.*; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.StreamReadConstraints; @@ -18,6 +19,7 @@ import datahub.event.UpsertAspectRequest; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; @@ -26,6 +28,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import javax.annotation.concurrent.ThreadSafe; @@ -97,17 +100,20 @@ public RestEmitter(RestEmitterConfig config) { this.config = config; HttpAsyncClientBuilder httpClientBuilder = this.config.getAsyncHttpClientBuilder(); httpClientBuilder.setRetryStrategy(new DatahubHttpRequestRetryStrategy()); - - // Override httpClient settings with RestEmitter configs if present - if (config.getTimeoutSec() != null) { - httpClientBuilder.setDefaultRequestConfig( - RequestConfig.custom() - .setConnectionRequestTimeout( - config.getTimeoutSec() * 1000, java.util.concurrent.TimeUnit.MILLISECONDS) - .setResponseTimeout( - config.getTimeoutSec() * 1000, java.util.concurrent.TimeUnit.MILLISECONDS) - .build()); + if ((config.getTimeoutSec() != null) || (config.isDisableChunkedEncoding())) { + RequestConfig.Builder requestConfigBuilder = RequestConfig.custom(); + // Override httpClient settings with RestEmitter configs if present + if (config.getTimeoutSec() != null) { + requestConfigBuilder + .setConnectionRequestTimeout(config.getTimeoutSec() * 1000, TimeUnit.MILLISECONDS) + .setResponseTimeout(config.getTimeoutSec() * 1000, TimeUnit.MILLISECONDS); + } + if (config.isDisableChunkedEncoding()) { + requestConfigBuilder.setContentCompressionEnabled(false); + } + httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build()); } + PoolingAsyncClientConnectionManagerBuilder poolingAsyncClientConnectionManagerBuilder = PoolingAsyncClientConnectionManagerBuilder.create(); @@ -223,8 +229,13 @@ private Future postGeneric( if (this.config.getToken() != null) { simpleRequestBuilder.setHeader("Authorization", "Bearer " + this.config.getToken()); } + if (this.config.isDisableChunkedEncoding()) { + byte[] payloadBytes = payloadJson.getBytes(StandardCharsets.UTF_8); + simpleRequestBuilder.setBody(payloadBytes, ContentType.APPLICATION_JSON); + } else { + simpleRequestBuilder.setBody(payloadJson, ContentType.APPLICATION_JSON); + } - simpleRequestBuilder.setBody(payloadJson, ContentType.APPLICATION_JSON); AtomicReference responseAtomicReference = new AtomicReference<>(); CountDownLatch responseLatch = new CountDownLatch(1); FutureCallback httpCallback = diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitterConfig.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitterConfig.java index e28ad4ed660f0..55c11aab0ebf3 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitterConfig.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/rest/RestEmitterConfig.java @@ -30,6 +30,8 @@ public class RestEmitterConfig { Integer timeoutSec; @Builder.Default boolean disableSslVerification = false; + @Builder.Default boolean disableChunkedEncoding = false; + @Builder.Default int maxRetries = 0; @Builder.Default int retryIntervalSec = 10; diff --git a/metadata-integration/java/openlineage-converter/build.gradle b/metadata-integration/java/openlineage-converter/build.gradle index 2e04881ab5ccd..d149104f089b3 100644 --- a/metadata-integration/java/openlineage-converter/build.gradle +++ b/metadata-integration/java/openlineage-converter/build.gradle @@ -1,4 +1,4 @@ -apply plugin: 'java' +apply plugin: 'java-library' apply plugin: 'com.github.johnrengelman.shadow' apply plugin: 'signing' apply plugin: 'maven-publish' diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java index fb2880f617d30..c909b0034a912 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/MCLKafkaListenerRegistrar.java @@ -1,5 +1,7 @@ package com.linkedin.metadata.kafka; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCL_EVENT_CONSUMER_NAME; + import com.linkedin.metadata.kafka.config.MetadataChangeLogProcessorCondition; import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook; import com.linkedin.mxe.Topics; @@ -39,7 +41,7 @@ public class MCLKafkaListenerRegistrar implements InitializingBean { @Autowired private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; @Autowired - @Qualifier("kafkaEventConsumer") + @Qualifier(MCL_EVENT_CONSUMER_NAME) private KafkaListenerContainerFactory kafkaListenerContainerFactory; @Value("${METADATA_CHANGE_LOG_KAFKA_CONSUMER_GROUP_ID:generic-mae-consumer-job-client}") diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java index 1b3d19915b439..5d2f6452e6919 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java @@ -1,5 +1,7 @@ package com.linkedin.metadata.kafka; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.DEFAULT_EVENT_CONSUMER_NAME; + import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; @@ -60,7 +62,7 @@ public class MetadataChangeEventsProcessor { "${METADATA_CHANGE_EVENT_NAME:${KAFKA_MCE_TOPIC_NAME:" + Topics.METADATA_CHANGE_EVENT + "}}", - containerFactory = "kafkaEventConsumer") + containerFactory = DEFAULT_EVENT_CONSUMER_NAME) @Deprecated public void consume(final ConsumerRecord consumerRecord) { try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) { diff --git a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java index 22c2b4b9c0450..ef87afdef46cb 100644 --- a/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java +++ b/metadata-jobs/mce-consumer/src/main/java/com/linkedin/metadata/kafka/MetadataChangeProposalsProcessor.java @@ -4,6 +4,7 @@ import static com.linkedin.metadata.Constants.MDC_CHANGE_TYPE; import static com.linkedin.metadata.Constants.MDC_ENTITY_TYPE; import static com.linkedin.metadata.Constants.MDC_ENTITY_URN; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCP_EVENT_CONSUMER_NAME; import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; @@ -116,7 +117,7 @@ public void registerConsumerThrottle() { @KafkaListener( id = CONSUMER_GROUP_ID_VALUE, topics = "${METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + Topics.METADATA_CHANGE_PROPOSAL + "}", - containerFactory = "kafkaEventConsumer") + containerFactory = MCP_EVENT_CONSUMER_NAME) public void consume(final ConsumerRecord consumerRecord) { try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "consume").time()) { kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp()); diff --git a/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java b/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java index 358a2ac0c2ee3..5d11697bed93d 100644 --- a/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java +++ b/metadata-jobs/pe-consumer/src/main/java/com/datahub/event/PlatformEventProcessor.java @@ -1,5 +1,7 @@ package com.datahub.event; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.PE_EVENT_CONSUMER_NAME; + import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; @@ -56,7 +58,7 @@ public PlatformEventProcessor( @KafkaListener( id = "${PLATFORM_EVENT_KAFKA_CONSUMER_GROUP_ID:generic-platform-event-job-client}", topics = {"${PLATFORM_EVENT_TOPIC_NAME:" + Topics.PLATFORM_EVENT + "}"}, - containerFactory = "kafkaEventConsumer") + containerFactory = PE_EVENT_CONSUMER_NAME) public void consume(final ConsumerRecord consumerRecord) { try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) { diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/ConsumerConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/ConsumerConfiguration.java index 60f3e1b4fef76..9b476483a2baf 100644 --- a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/ConsumerConfiguration.java +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/ConsumerConfiguration.java @@ -8,4 +8,13 @@ public class ConsumerConfiguration { private int maxPartitionFetchBytes; private boolean stopOnDeserializationError; private boolean healthCheckEnabled; + + private ConsumerOptions mcp; + private ConsumerOptions mcl; + private ConsumerOptions pe; + + @Data + public static class ConsumerOptions { + private String autoOffsetReset; + } } diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/KafkaConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/KafkaConfiguration.java index b03aedc1a7b5e..ae0d3a3bb4647 100644 --- a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/KafkaConfiguration.java +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/kafka/KafkaConfiguration.java @@ -20,6 +20,10 @@ public class KafkaConfiguration { "spring.deserializer.key.delegate.class"; public static final String VALUE_DESERIALIZER_DELEGATE_CLASS = "spring.deserializer.value.delegate.class"; + public static final String MCP_EVENT_CONSUMER_NAME = "mcpEventConsumer"; + public static final String MCL_EVENT_CONSUMER_NAME = "mclEventConsumer"; + public static final String PE_EVENT_CONSUMER_NAME = "platformEventConsumer"; + public static final String DEFAULT_EVENT_CONSUMER_NAME = "kafkaEventConsumer"; private String bootstrapServers; diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml index 8010ae187b6c8..4945b36a251c2 100644 --- a/metadata-service/configuration/src/main/resources/application.yaml +++ b/metadata-service/configuration/src/main/resources/application.yaml @@ -289,6 +289,13 @@ kafka: maxPartitionFetchBytes: ${KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES:5242880} # the max bytes consumed per partition stopOnDeserializationError: ${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:true} # Stops kafka listener container on deserialization error, allows user to fix problems before moving past problematic offset. If false will log and move forward past the offset healthCheckEnabled: ${KAFKA_CONSUMER_HEALTH_CHECK_ENABLED:true} # Sets the health indicator to down when a message listener container has stopped due to a deserialization failure, will force consumer apps to restart through k8s and docker-compose health mechanisms + mcp: + autoOffsetReset: ${KAFKA_CONSUMER_MCP_AUTO_OFFSET_RESET:earliest} + mcl: + autoOffsetReset: ${KAFKA_CONSUMER_MCL_AUTO_OFFSET_RESET:earliest} + pe: + autoOffsetReset: ${KAFKA_CONSUMER_PE_AUTO_OFFSET_RESET:latest} + schemaRegistry: type: ${SCHEMA_REGISTRY_TYPE:KAFKA} # INTERNAL or KAFKA or AWS_GLUE url: ${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081} diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java index 750af8ec488df..a1ee4df360b7e 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/KafkaEventConsumerFactory.java @@ -1,10 +1,18 @@ package com.linkedin.gms.factory.kafka; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.DEFAULT_EVENT_CONSUMER_NAME; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCL_EVENT_CONSUMER_NAME; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCP_EVENT_CONSUMER_NAME; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.PE_EVENT_CONSUMER_NAME; + import com.linkedin.gms.factory.config.ConfigurationProvider; +import com.linkedin.metadata.config.kafka.ConsumerConfiguration; import com.linkedin.metadata.config.kafka.KafkaConfiguration; import java.time.Duration; import java.util.Arrays; +import java.util.HashMap; import java.util.Map; +import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -23,7 +31,6 @@ @Slf4j @Configuration public class KafkaEventConsumerFactory { - private int kafkaEventConsumerConcurrency; @Bean(name = "kafkaConsumerFactory") @@ -87,15 +94,82 @@ private static Map buildCustomizedProperties( return customizedProperties; } - @Bean(name = "kafkaEventConsumer") + @Bean(name = PE_EVENT_CONSUMER_NAME) + protected KafkaListenerContainerFactory platformEventConsumer( + @Qualifier("kafkaConsumerFactory") + DefaultKafkaConsumerFactory kafkaConsumerFactory, + @Qualifier("configurationProvider") ConfigurationProvider configurationProvider) { + + return buildDefaultKafkaListenerContainerFactory( + PE_EVENT_CONSUMER_NAME, + kafkaConsumerFactory, + configurationProvider.getKafka().getConsumer().isStopOnDeserializationError(), + configurationProvider.getKafka().getConsumer().getPe()); + } + + @Bean(name = MCP_EVENT_CONSUMER_NAME) + protected KafkaListenerContainerFactory mcpEventConsumer( + @Qualifier("kafkaConsumerFactory") + DefaultKafkaConsumerFactory kafkaConsumerFactory, + @Qualifier("configurationProvider") ConfigurationProvider configurationProvider) { + + return buildDefaultKafkaListenerContainerFactory( + MCP_EVENT_CONSUMER_NAME, + kafkaConsumerFactory, + configurationProvider.getKafka().getConsumer().isStopOnDeserializationError(), + configurationProvider.getKafka().getConsumer().getMcp()); + } + + @Bean(name = MCL_EVENT_CONSUMER_NAME) + protected KafkaListenerContainerFactory mclEventConsumer( + @Qualifier("kafkaConsumerFactory") + DefaultKafkaConsumerFactory kafkaConsumerFactory, + @Qualifier("configurationProvider") ConfigurationProvider configurationProvider) { + + return buildDefaultKafkaListenerContainerFactory( + MCL_EVENT_CONSUMER_NAME, + kafkaConsumerFactory, + configurationProvider.getKafka().getConsumer().isStopOnDeserializationError(), + configurationProvider.getKafka().getConsumer().getMcl()); + } + + @Bean(name = DEFAULT_EVENT_CONSUMER_NAME) protected KafkaListenerContainerFactory kafkaEventConsumer( @Qualifier("kafkaConsumerFactory") DefaultKafkaConsumerFactory kafkaConsumerFactory, @Qualifier("configurationProvider") ConfigurationProvider configurationProvider) { + return buildDefaultKafkaListenerContainerFactory( + DEFAULT_EVENT_CONSUMER_NAME, + kafkaConsumerFactory, + configurationProvider.getKafka().getConsumer().isStopOnDeserializationError(), + null); + } + + private KafkaListenerContainerFactory buildDefaultKafkaListenerContainerFactory( + String consumerFactoryName, + DefaultKafkaConsumerFactory kafkaConsumerFactory, + boolean isStopOnDeserializationError, + @Nullable ConsumerConfiguration.ConsumerOptions consumerOptions) { + + final DefaultKafkaConsumerFactory factoryWithOverrides; + if (consumerOptions != null) { + // Copy the base config + Map props = new HashMap<>(kafkaConsumerFactory.getConfigurationProperties()); + // Override just the auto.offset.reset + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerOptions.getAutoOffsetReset()); + factoryWithOverrides = + new DefaultKafkaConsumerFactory<>( + props, + kafkaConsumerFactory.getKeyDeserializer(), + kafkaConsumerFactory.getValueDeserializer()); + } else { + factoryWithOverrides = kafkaConsumerFactory; + } + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(kafkaConsumerFactory); + factory.setConsumerFactory(factoryWithOverrides); factory.setContainerCustomizer(new ThreadPoolContainerCustomizer()); factory.setConcurrency(kafkaEventConsumerConcurrency); @@ -103,7 +177,7 @@ protected KafkaListenerContainerFactory kafkaEventConsumer( use DefaultErrorHandler (does back-off retry and then logs) rather than stopping the container. Stopping the container prevents lost messages until the error can be examined, disabling this will allow progress, but may lose data */ - if (configurationProvider.getKafka().getConsumer().isStopOnDeserializationError()) { + if (isStopOnDeserializationError) { CommonDelegatingErrorHandler delegatingErrorHandler = new CommonDelegatingErrorHandler(new DefaultErrorHandler()); delegatingErrorHandler.addDelegate( @@ -111,9 +185,9 @@ use DefaultErrorHandler (does back-off retry and then logs) rather than stopping factory.setCommonErrorHandler(delegatingErrorHandler); } log.info( - String.format( - "Event-based KafkaListenerContainerFactory built successfully. Consumer concurrency = %s", - kafkaEventConsumerConcurrency)); + "Event-based {} KafkaListenerContainerFactory built successfully. Consumer concurrency = {}", + consumerFactoryName, + kafkaEventConsumerConcurrency); return factory; } diff --git a/metadata-service/schema-registry-servlet/src/test/java/io/datahubproject/openapi/test/SchemaRegistryControllerTest.java b/metadata-service/schema-registry-servlet/src/test/java/io/datahubproject/openapi/test/SchemaRegistryControllerTest.java index 664766f204e46..e8deed00672da 100644 --- a/metadata-service/schema-registry-servlet/src/test/java/io/datahubproject/openapi/test/SchemaRegistryControllerTest.java +++ b/metadata-service/schema-registry-servlet/src/test/java/io/datahubproject/openapi/test/SchemaRegistryControllerTest.java @@ -1,6 +1,7 @@ package io.datahubproject.openapi.test; import static com.linkedin.metadata.Constants.*; +import static com.linkedin.metadata.config.kafka.KafkaConfiguration.DEFAULT_EVENT_CONSUMER_NAME; import static org.testng.Assert.*; import com.linkedin.common.urn.Urn; @@ -199,7 +200,7 @@ public void testPEConsumption() @KafkaListener( id = "test-mcp-consumer", topics = Topics.METADATA_CHANGE_PROPOSAL, - containerFactory = "kafkaEventConsumer", + containerFactory = DEFAULT_EVENT_CONSUMER_NAME, properties = {"auto.offset.reset:earliest"}) public void receiveMCP(ConsumerRecord consumerRecord) { @@ -216,7 +217,7 @@ public void receiveMCP(ConsumerRecord consumerRecord) { @KafkaListener( id = "test-mcl-consumer", topics = Topics.METADATA_CHANGE_LOG_VERSIONED, - containerFactory = "kafkaEventConsumer", + containerFactory = DEFAULT_EVENT_CONSUMER_NAME, properties = {"auto.offset.reset:earliest"}) public void receiveMCL(ConsumerRecord consumerRecord) { @@ -232,7 +233,7 @@ public void receiveMCL(ConsumerRecord consumerRecord) { @KafkaListener( id = "test-pe-consumer", topics = Topics.PLATFORM_EVENT, - containerFactory = "kafkaEventConsumer", + containerFactory = DEFAULT_EVENT_CONSUMER_NAME, properties = {"auto.offset.reset:earliest"}) public void receivePE(ConsumerRecord consumerRecord) {