From 97e02299f9c7e541ea8d902a081f58921c0f0952 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Mon, 16 Sep 2024 13:33:48 -0500 Subject: [PATCH 01/12] fix(XServiceProvider): fix ebean framework race condition (#11378) --- .github/workflows/docker-unified.yml | 2 +- docker/datahub-ingestion-base/Dockerfile | 2 ++ .../metadata/restli/RestliServletConfig.java | 5 +++-- .../filter/AuthenticationFilter.java | 1 + .../boot/OnBootApplicationListener.java | 11 +++++++++++ .../restli/server/RAPServletFactory.java | 7 +++++-- .../restli/server/RestliHandlerServlet.java | 14 ++++++++++---- .../gms/WebApplicationInitializer.java | 18 ++++++++++-------- .../gms/servlet/RestliServletConfig.java | 4 ++-- 9 files changed, 45 insertions(+), 19 deletions(-) diff --git a/.github/workflows/docker-unified.yml b/.github/workflows/docker-unified.yml index 075da20cdd11c..9e9a0ed9884b4 100644 --- a/.github/workflows/docker-unified.yml +++ b/.github/workflows/docker-unified.yml @@ -990,7 +990,7 @@ jobs: DATAHUB_VERSION: ${{ needs.setup.outputs.unique_tag }} DATAHUB_ACTIONS_IMAGE: ${{ env.DATAHUB_INGESTION_IMAGE }} ACTIONS_VERSION: ${{ needs.datahub_ingestion_slim_build.outputs.tag || 'head-slim' }} - ACTIONS_EXTRA_PACKAGES: "acryl-datahub-actions[executor]==0.0.13 acryl-datahub-actions==0.0.13 acryl-datahub==0.10.5" + ACTIONS_EXTRA_PACKAGES: "acryl-datahub-actions[executor] acryl-datahub-actions" ACTIONS_CONFIG: "https://raw.githubusercontent.com/acryldata/datahub-actions/main/docker/config/executor.yaml" run: | ./smoke-test/run-quickstart.sh diff --git a/docker/datahub-ingestion-base/Dockerfile b/docker/datahub-ingestion-base/Dockerfile index 92b1762099882..08cf2efdcb6a1 100644 --- a/docker/datahub-ingestion-base/Dockerfile +++ b/docker/datahub-ingestion-base/Dockerfile @@ -43,7 +43,9 @@ RUN apt-get update && apt-get upgrade -y \ krb5-user \ krb5-config \ libkrb5-dev \ + librdkafka-dev \ wget \ + curl \ zip \ unzip \ ldap-utils \ diff --git a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/restli/RestliServletConfig.java b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/restli/RestliServletConfig.java index 269b9a41a89a9..1e16f7d42c6e8 100644 --- a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/restli/RestliServletConfig.java +++ b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/restli/RestliServletConfig.java @@ -2,6 +2,7 @@ import com.datahub.auth.authentication.filter.AuthenticationFilter; import com.linkedin.gms.factory.auth.SystemAuthenticationFactory; +import com.linkedin.r2.transport.http.server.RAPJakartaServlet; import com.linkedin.restli.server.RestliHandlerServlet; import java.util.Collections; import org.springframework.beans.factory.annotation.Qualifier; @@ -36,8 +37,8 @@ public ServletRegistrationBean restliServletRegistration( } @Bean("restliHandlerServlet") - public RestliHandlerServlet restliHandlerServlet() { - return new RestliHandlerServlet(); + public RestliHandlerServlet restliHandlerServlet(final RAPJakartaServlet r2Servlet) { + return new RestliHandlerServlet(r2Servlet); } @Bean diff --git a/metadata-service/auth-filter/src/main/java/com/datahub/auth/authentication/filter/AuthenticationFilter.java b/metadata-service/auth-filter/src/main/java/com/datahub/auth/authentication/filter/AuthenticationFilter.java index ee2efd2ae9536..0a54677eb6149 100644 --- a/metadata-service/auth-filter/src/main/java/com/datahub/auth/authentication/filter/AuthenticationFilter.java +++ b/metadata-service/auth-filter/src/main/java/com/datahub/auth/authentication/filter/AuthenticationFilter.java @@ -77,6 +77,7 @@ public class AuthenticationFilter implements Filter { public void init(FilterConfig filterConfig) throws ServletException { SpringBeanAutowiringSupport.processInjectionBasedOnCurrentContext(this); buildAuthenticatorChain(); + log.info("AuthenticationFilter initialized."); } @Override diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/OnBootApplicationListener.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/OnBootApplicationListener.java index ced315ba6e65f..921246fa98f7a 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/OnBootApplicationListener.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/OnBootApplicationListener.java @@ -73,6 +73,17 @@ public void onApplicationEvent(@Nonnull ContextRefreshedEvent event) { event); String schemaRegistryType = provider.getKafka().getSchemaRegistry().getType(); if (ROOT_WEB_APPLICATION_CONTEXT_ID.equals(event.getApplicationContext().getId())) { + + // Handle race condition, if ebean code is executed while waiting/bootstrapping (i.e. + // AuthenticationFilter) + try { + Class.forName("io.ebean.XServiceProvider"); + } catch (ClassNotFoundException e) { + log.error( + "Failure to initialize required class `io.ebean.XServiceProvider` during initialization."); + throw new RuntimeException(e); + } + if (InternalSchemaRegistryFactory.TYPE.equals(schemaRegistryType)) { executorService.submit(isSchemaRegistryAPIServletReady()); } else { diff --git a/metadata-service/factories/src/main/java/com/linkedin/restli/server/RAPServletFactory.java b/metadata-service/factories/src/main/java/com/linkedin/restli/server/RAPServletFactory.java index e07a25914a039..be060477aeb1f 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/restli/server/RAPServletFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/restli/server/RAPServletFactory.java @@ -16,6 +16,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -29,8 +30,10 @@ public class RAPServletFactory { private int maxSerializedStringLength; @Bean(name = "restliSpringInjectResourceFactory") - public SpringInjectResourceFactory springInjectResourceFactory() { - return new SpringInjectResourceFactory(); + public SpringInjectResourceFactory springInjectResourceFactory(final ApplicationContext ctx) { + SpringInjectResourceFactory springInjectResourceFactory = new SpringInjectResourceFactory(); + springInjectResourceFactory.setApplicationContext(ctx); + return springInjectResourceFactory; } @Bean("parseqEngineThreads") diff --git a/metadata-service/factories/src/main/java/com/linkedin/restli/server/RestliHandlerServlet.java b/metadata-service/factories/src/main/java/com/linkedin/restli/server/RestliHandlerServlet.java index 7702e8bd6de89..bfc25b7ddaef5 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/restli/server/RestliHandlerServlet.java +++ b/metadata-service/factories/src/main/java/com/linkedin/restli/server/RestliHandlerServlet.java @@ -1,23 +1,29 @@ package com.linkedin.restli.server; import com.linkedin.r2.transport.http.server.RAPJakartaServlet; +import jakarta.servlet.ServletConfig; import jakarta.servlet.ServletException; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; import java.io.IOException; import lombok.AllArgsConstructor; -import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; import org.springframework.web.HttpRequestHandler; import org.springframework.web.context.support.HttpRequestHandlerServlet; +@Slf4j @AllArgsConstructor -@NoArgsConstructor -@Component public class RestliHandlerServlet extends HttpRequestHandlerServlet implements HttpRequestHandler { @Autowired private RAPJakartaServlet _r2Servlet; + @Override + public void init(ServletConfig config) throws ServletException { + log.info("Initializing RestliHandlerServlet"); + this._r2Servlet.init(config); + log.info("Initialized RestliHandlerServlet"); + } + @Override public void service(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException { diff --git a/metadata-service/war/src/main/java/com/linkedin/gms/WebApplicationInitializer.java b/metadata-service/war/src/main/java/com/linkedin/gms/WebApplicationInitializer.java index fd32610be3532..4ed84e48a5049 100644 --- a/metadata-service/war/src/main/java/com/linkedin/gms/WebApplicationInitializer.java +++ b/metadata-service/war/src/main/java/com/linkedin/gms/WebApplicationInitializer.java @@ -42,18 +42,13 @@ public void onStartup(ServletContext container) { // Independent dispatcher schemaRegistryServlet(container); - // Non-Spring servlets - healthCheckServlet(container); - configServlet(container); - - // Restli non-Dispatcher - servletNames.add(restliServlet(rootContext, container)); - // Spring Dispatcher servlets DispatcherServlet dispatcherServlet = new DispatcherServlet(rootContext); servletNames.add(authServlet(rootContext, dispatcherServlet, container)); servletNames.add(graphQLServlet(rootContext, dispatcherServlet, container)); servletNames.add(openAPIServlet(rootContext, dispatcherServlet, container)); + // Restli non-Dispatcher default + servletNames.add(restliServlet(rootContext, container)); FilterRegistration.Dynamic filterRegistration = container.addFilter("authenticationFilter", AuthenticationFilter.class); @@ -62,6 +57,10 @@ public void onStartup(ServletContext container) { EnumSet.of(DispatcherType.ASYNC, DispatcherType.REQUEST), false, servletNames.toArray(String[]::new)); + + // Non-Spring servlets + healthCheckServlet(container); + configServlet(container); } /* @@ -133,10 +132,13 @@ private String restliServlet( rootContext.register(RestliServletConfig.class); ServletRegistration.Dynamic registration = - container.addServlet(servletName, new HttpRequestHandlerServlet()); + container.addServlet(servletName, HttpRequestHandlerServlet.class); registration.addMapping("/*"); registration.setLoadOnStartup(10); registration.setAsyncSupported(true); + registration.setInitParameter( + "org.springframework.web.servlet.FrameworkServlet.ORDER", + String.valueOf(Integer.MAX_VALUE - 1)); return servletName; } diff --git a/metadata-service/war/src/main/java/com/linkedin/gms/servlet/RestliServletConfig.java b/metadata-service/war/src/main/java/com/linkedin/gms/servlet/RestliServletConfig.java index 28e2aabe139da..222e2356dfb1c 100644 --- a/metadata-service/war/src/main/java/com/linkedin/gms/servlet/RestliServletConfig.java +++ b/metadata-service/war/src/main/java/com/linkedin/gms/servlet/RestliServletConfig.java @@ -7,14 +7,14 @@ import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; -import org.springframework.web.HttpRequestHandler; +import org.springframework.web.context.support.HttpRequestHandlerServlet; @ComponentScan(basePackages = {"com.linkedin.restli.server"}) @PropertySource(value = "classpath:/application.yaml", factory = YamlPropertySourceFactory.class) @Configuration public class RestliServletConfig { @Bean("restliRequestHandler") - public HttpRequestHandler restliHandlerServlet(final RAPJakartaServlet r2Servlet) { + public HttpRequestHandlerServlet restliHandlerServlet(final RAPJakartaServlet r2Servlet) { return new RestliHandlerServlet(r2Servlet); } } From 3fe39bc59b7ee914c418147c634736da8a5cd033 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20L=C3=BCdin?= <13187726+Masterchen09@users.noreply.github.com> Date: Mon, 16 Sep 2024 23:40:03 +0200 Subject: [PATCH 02/12] fix(docs): clarify clean-up of indices when restoring search and graph indices (#11380) --- docs/how/restore-indices.md | 51 +++++++++++++++++++++++++++++++------ 1 file changed, 43 insertions(+), 8 deletions(-) diff --git a/docs/how/restore-indices.md b/docs/how/restore-indices.md index 4b7fd77314ef7..368b385ae5ea5 100644 --- a/docs/how/restore-indices.md +++ b/docs/how/restore-indices.md @@ -7,24 +7,39 @@ When a new version of the aspect gets ingested, GMS initiates an MAE event for t the search and graph indices. As such, we can fetch the latest version of each aspect in the local database and produce MAE events corresponding to the aspects to restore the search and graph indices. +By default, restoring the indices from the local database will not remove any existing documents in +the search and graph indices that no longer exist in the local database, potentially leading to inconsistencies +between the search and graph indices and the local database. + ## Quickstart -If you're using the quickstart images, you can use the `datahub` cli to restore indices. +If you're using the quickstart images, you can use the `datahub` cli to restore the indices. -``` +```shell datahub docker quickstart --restore-indices ``` -See [this section](../quickstart.md#restoring-only-the-index-use-with-care) for more information. + +:::info +Using the `datahub` CLI to restore the indices when using the quickstart images will also clear the search and graph indices before restoring. + +See [this section](../quickstart.md#restore-datahub) for more information. ## Docker-compose -If you are on a custom docker-compose deployment, run the following command (you need to checkout [the source repository](https://github.com/datahub-project/datahub)) from the root of the repo to send MAE for each aspect in the Local DB. +If you are on a custom docker-compose deployment, run the following command (you need to checkout [the source repository](https://github.com/datahub-project/datahub)) from the root of the repo to send MAE for each aspect in the local database. -``` +```shell ./docker/datahub-upgrade/datahub-upgrade.sh -u RestoreIndices ``` -If you need to clear the search and graph indices before restoring, add `-a clean` to the end of the command. +:::info +By default this command will not clear the search and graph indices before restoring, thous potentially leading to inconsistencies between the local database and the indices, in case aspects were previously deleted in the local database but were not removed from the correponding index. + +If you need to clear the search and graph indices before restoring, add `-a clean` to the end of the command. Please take note that the search and graph services might not be fully functional during reindexing when the indices are cleared. + +```shell +./docker/datahub-upgrade/datahub-upgrade.sh -u RestoreIndices -a clean +``` Refer to this [doc](../../docker/datahub-upgrade/README.md#environment-variables) on how to set environment variables for your environment. @@ -44,11 +59,31 @@ If not, deploy latest helm charts to use this functionality. Once restore indices job template has been deployed, run the following command to start a job that restores indices. -``` +```shell kubectl create job --from=cronjob/datahub-datahub-restore-indices-job-template datahub-restore-indices-adhoc ``` -Once the job completes, your indices will have been restored. +Once the job completes, your indices will have been restored. + +:::info +By default the restore indices job template will not clear the search and graph indices before restoring, thous potentially leading to inconsistencies between the local database and the indices, in case aspects were previously deleted in the local database but were not removed from the correponding index. + +If you need to clear the search and graph indices before restoring, modify the `values.yaml` for your deployment and overwrite the default arguments of the restore indices job template to include the `-a clean` argument. Please take note that the search and graph services might not be fully functional during reindexing when the indices are cleared. + +```yaml +datahubUpgrade: + restoreIndices: + image: + args: + - "-u" + - "RestoreIndices" + - "-a" + - "batchSize=1000" # default value of datahubUpgrade.batchSize + - "-a" + - "batchDelayMs=100" # default value of datahubUpgrade.batchDelayMs + - "-a" + - "clean" +``` ## Through API From 49a51be7aaac86addb4762649e18b187d3546702 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 16 Sep 2024 14:56:10 -0700 Subject: [PATCH 03/12] feat(ingest): report ingest run for sample data (#11329) --- .../src/datahub/cli/docker_cli.py | 20 ++++--------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/metadata-ingestion/src/datahub/cli/docker_cli.py b/metadata-ingestion/src/datahub/cli/docker_cli.py index 971d4e6e72aa1..86bcd7eff1cbf 100644 --- a/metadata-ingestion/src/datahub/cli/docker_cli.py +++ b/metadata-ingestion/src/datahub/cli/docker_cli.py @@ -36,7 +36,6 @@ from datahub.telemetry import telemetry from datahub.upgrade import upgrade from datahub.utilities.perf_timer import PerfTimer -from datahub.utilities.sample_data import BOOTSTRAP_MCES_FILE, download_sample_data logger = logging.getLogger(__name__) _ClickPositiveInt = click.IntRange(min=1) @@ -957,11 +956,6 @@ def valid_restore_options( @docker.command() -@click.option( - "--path", - type=click.Path(exists=True, dir_okay=False), - help=f"The MCE json file to ingest. Defaults to downloading {BOOTSTRAP_MCES_FILE} from GitHub", -) @click.option( "--token", type=str, @@ -970,13 +964,9 @@ def valid_restore_options( help="The token to be used when ingesting, used when datahub is deployed with METADATA_SERVICE_AUTH_ENABLED=true", ) @telemetry.with_telemetry() -def ingest_sample_data(path: Optional[str], token: Optional[str]) -> None: +def ingest_sample_data(token: Optional[str]) -> None: """Ingest sample data into a running DataHub instance.""" - if path is None: - click.echo("Downloading sample data...") - path = str(download_sample_data()) - # Verify that docker is up. status = check_docker_quickstart() if not status.is_ok(): @@ -989,10 +979,8 @@ def ingest_sample_data(path: Optional[str], token: Optional[str]) -> None: click.echo("Starting ingestion...") recipe: dict = { "source": { - "type": "file", - "config": { - "path": path, - }, + "type": "demo-data", + "config": {}, }, "sink": { "type": "datahub-rest", @@ -1003,7 +991,7 @@ def ingest_sample_data(path: Optional[str], token: Optional[str]) -> None: if token is not None: recipe["sink"]["config"]["token"] = token - pipeline = Pipeline.create(recipe, report_to=None) + pipeline = Pipeline.create(recipe) pipeline.run() ret = pipeline.pretty_print_summary() sys.exit(ret) From 1d297afe35f545966f5f3d677c16a4249972e6b6 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Mon, 16 Sep 2024 18:37:36 -0500 Subject: [PATCH 04/12] fix(ebean): upgrade ebean library (#11379) --- build.gradle | 11 ++- .../upgrade/nocode/CreateAspectTableStep.java | 2 +- .../acryl-spark-lineage/scripts/check_jar.sh | 6 +- .../datahub-protobuf/scripts/check_jar.sh | 5 +- .../spark-lineage-legacy/scripts/check_jar.sh | 3 +- metadata-io/build.gradle | 18 +++-- .../metadata/entity/EntityServiceImpl.java | 2 +- .../metadata/entity/ebean/EbeanAspectDao.java | 49 +++++++++---- .../metadata/entity/ebean/EbeanAspectV1.java | 33 ++++----- .../metadata/entity/ebean/EbeanAspectV2.java | 34 +++++---- .../entity/ebean/EbeanAspectDaoTest.java | 72 +++++++++++++++++++ 11 files changed, 170 insertions(+), 65 deletions(-) create mode 100644 metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/EbeanAspectDaoTest.java diff --git a/build.gradle b/build.gradle index fbced335ddc2e..07ca1f09e813c 100644 --- a/build.gradle +++ b/build.gradle @@ -52,7 +52,7 @@ buildscript { ext.hadoop3Version = '3.3.6' ext.kafkaVersion = '5.5.15' ext.hazelcastVersion = '5.3.6' - ext.ebeanVersion = '12.16.1' + ext.ebeanVersion = '15.5.2' ext.googleJavaFormatVersion = '1.18.1' ext.openLineageVersion = '1.19.0' ext.logbackClassicJava8 = '1.2.12' @@ -104,8 +104,8 @@ project.ext.spec = [ project.ext.externalDependency = [ 'akkaHttp': 'com.typesafe.akka:akka-http-core_2.12:10.2.10', - 'antlr4Runtime': 'org.antlr:antlr4-runtime:4.7.2', - 'antlr4': 'org.antlr:antlr4:4.7.2', + 'antlr4Runtime': 'org.antlr:antlr4-runtime:4.9.3', + 'antlr4': 'org.antlr:antlr4:4.9.3', 'assertJ': 'org.assertj:assertj-core:3.11.1', 'avro': 'org.apache.avro:avro:1.11.3', 'avroCompiler': 'org.apache.avro:avro-compiler:1.11.3', @@ -129,8 +129,10 @@ project.ext.externalDependency = [ 'dropwizardMetricsCore': 'io.dropwizard.metrics:metrics-core:4.2.3', 'dropwizardMetricsJmx': 'io.dropwizard.metrics:metrics-jmx:4.2.3', 'ebean': 'io.ebean:ebean:' + ebeanVersion, + 'ebeanTest': 'io.ebean:ebean-test:' + ebeanVersion, 'ebeanAgent': 'io.ebean:ebean-agent:' + ebeanVersion, 'ebeanDdl': 'io.ebean:ebean-ddl-generator:' + ebeanVersion, + 'ebeanQueryBean': 'io.ebean:querybean-generator:' + ebeanVersion, 'elasticSearchRest': 'org.opensearch.client:opensearch-rest-high-level-client:' + elasticsearchVersion, 'elasticSearchJava': 'org.opensearch.client:opensearch-java:2.6.0', 'findbugsAnnotations': 'com.google.code.findbugs:annotations:3.0.1', @@ -359,6 +361,9 @@ configure(subprojects.findAll {! it.name.startsWith('spark-lineage')}) { exclude group: "org.slf4j", module: "slf4j-log4j12" exclude group: "org.slf4j", module: "slf4j-nop" exclude group: "org.slf4j", module: "slf4j-ext" + + resolutionStrategy.force externalDependency.antlr4Runtime + resolutionStrategy.force externalDependency.antlr4 } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/CreateAspectTableStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/CreateAspectTableStep.java index 4855cef95cb6e..63b319d943a80 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/CreateAspectTableStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/CreateAspectTableStep.java @@ -77,7 +77,7 @@ public Function executable() { } try { - _server.execute(_server.createSqlUpdate(sqlUpdateStr)); + _server.execute(_server.sqlUpdate(sqlUpdateStr)); } catch (Exception e) { context.report().addLine("Failed to create table metadata_aspect_v2", e); return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.FAILED); diff --git a/metadata-integration/java/acryl-spark-lineage/scripts/check_jar.sh b/metadata-integration/java/acryl-spark-lineage/scripts/check_jar.sh index 41b09e0705b89..d679f270095f1 100755 --- a/metadata-integration/java/acryl-spark-lineage/scripts/check_jar.sh +++ b/metadata-integration/java/acryl-spark-lineage/scripts/check_jar.sh @@ -42,7 +42,11 @@ for jarFile in ${jarFiles}; do grep -v "MetadataChangeProposal.avsc" |\ grep -v "io.openlineage" |\ grep -v "org.apache" |\ - grep -v "aix" + grep -v "aix" |\ + grep -v "scala" |\ + grep -v "io/micrometer/" |\ + grep -v "library.properties|rootdoc.txt" \| + grep -v "com/ibm/.*" if [ $? -ne 0 ]; then diff --git a/metadata-integration/java/datahub-protobuf/scripts/check_jar.sh b/metadata-integration/java/datahub-protobuf/scripts/check_jar.sh index fe3dd8d18f699..bd0c28f0f8698 100755 --- a/metadata-integration/java/datahub-protobuf/scripts/check_jar.sh +++ b/metadata-integration/java/datahub-protobuf/scripts/check_jar.sh @@ -41,7 +41,10 @@ jar -tvf $jarFile |\ grep -v "aix" |\ grep -v "com/sun/" |\ grep -v "VersionInfo.java" |\ - grep -v "mime.types" + grep -v "mime.types" |\ + grep -v "com/ibm/.*" |\ + grep -v "org/glassfish/" |\ + grep -v "LICENSE" if [ $? -ne 0 ]; then echo "✅ No unexpected class paths found in ${jarFile}" diff --git a/metadata-integration/java/spark-lineage-legacy/scripts/check_jar.sh b/metadata-integration/java/spark-lineage-legacy/scripts/check_jar.sh index 2dd3743ae2ced..81d6a541d1c2a 100755 --- a/metadata-integration/java/spark-lineage-legacy/scripts/check_jar.sh +++ b/metadata-integration/java/spark-lineage-legacy/scripts/check_jar.sh @@ -39,7 +39,8 @@ jar -tvf $jarFile |\ grep -v "library.properties" |\ grep -v "rootdoc.txt" |\ grep -v "VersionInfo.java" |\ - grep -v "mime.types" + grep -v "mime.types" |\ + grep -v "com/ibm/.*" if [ $? -ne 0 ]; then diff --git a/metadata-io/build.gradle b/metadata-io/build.gradle index 7e72767c08b79..09a41d100199d 100644 --- a/metadata-io/build.gradle +++ b/metadata-io/build.gradle @@ -1,6 +1,7 @@ plugins { id 'java-library' id 'pegasus' + id 'io.ebean' version "${ebeanVersion}" // Use the latest version from global build.gradle } configurations { @@ -50,8 +51,9 @@ dependencies { runtimeOnly externalDependency.jna api externalDependency.kafkaClients api externalDependency.ebean - enhance externalDependency.ebeanAgent + annotationProcessor externalDependency.ebeanQueryBean implementation externalDependency.ebeanDdl + implementation externalDependency.ebeanAgent implementation externalDependency.opentelemetryAnnotations implementation externalDependency.resilience4j // Newer Spring libraries require JDK17 classes, allow for JDK11 @@ -89,6 +91,7 @@ dependencies { testImplementation externalDependency.lombok testImplementation externalDependency.springBootTest testImplementation spec.product.pegasus.restliServer + testImplementation externalDependency.ebeanTest // logback >=1.3 required due to `testcontainers` only testImplementation 'ch.qos.logback:logback-classic:1.4.7' @@ -139,17 +142,12 @@ test { testLogging.exceptionFormat = 'full' } -tasks.withType(Test) { - enableAssertions = false +ebean { + debugLevel = 1 // 0 - 9 } -project.compileJava { - doLast { - ant.taskdef(name: 'ebean', classname: 'io.ebean.enhance.ant.AntEnhanceTask', - classpath: project.configurations.enhance.asPath) - ant.ebean(classSource: "${project.buildDir}/classes/java/main", packages: 'com.linkedin.metadata.entity.ebean', - transformArgs: 'debug=1') - } +tasks.withType(Test) { + enableAssertions = false } clean { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index 69135a8a64805..34c98bba01af4 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -85,6 +85,7 @@ import com.linkedin.util.Pair; import io.datahubproject.metadata.context.OperationContext; import io.opentelemetry.extension.annotations.WithSpan; +import jakarta.persistence.EntityNotFoundException; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.sql.Timestamp; @@ -111,7 +112,6 @@ import java.util.stream.StreamSupport; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import javax.persistence.EntityNotFoundException; import lombok.Getter; import lombok.extern.slf4j.Slf4j; diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java index 4304be1aa2a00..6233bf5e0e35c 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java @@ -41,6 +41,8 @@ import io.ebean.Transaction; import io.ebean.TxScope; import io.ebean.annotation.TxIsolation; +import jakarta.persistence.PersistenceException; +import jakarta.persistence.Table; import java.net.URISyntaxException; import java.sql.Timestamp; import java.time.Clock; @@ -62,8 +64,6 @@ import java.util.stream.Stream; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import javax.persistence.PersistenceException; -import javax.persistence.Table; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -790,10 +790,43 @@ public long getMaxVersion(@Nonnull final String urn, @Nonnull final String aspec return result.isEmpty() ? -1 : result.get(0).getVersion(); } + /** + * This method is only used as a fallback. It does incur an extra read-lock that is naturally a + * result of getLatestAspects(, forUpdate=true) + * + * @param urnAspects urn and aspect names to fetch + * @return map of the aspect's next version + */ public Map> getNextVersions( @Nonnull Map> urnAspects) { validateConnection(); + List forUpdateKeys = new ArrayList<>(); + + // initialize with default next version of 0 + Map> result = + new HashMap<>( + urnAspects.entrySet().stream() + .map( + entry -> { + Map defaultNextVersion = new HashMap<>(); + entry + .getValue() + .forEach( + aspectName -> { + defaultNextVersion.put(aspectName, ASPECT_LATEST_VERSION); + forUpdateKeys.add( + new EbeanAspectV2.PrimaryKey( + entry.getKey(), aspectName, ASPECT_LATEST_VERSION)); + }); + return Map.entry(entry.getKey(), defaultNextVersion); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + + // forUpdate is required to avoid duplicate key violations (it is used as an indication that the + // max(version) was invalidated + _server.find(EbeanAspectV2.class).where().idIn(forUpdateKeys).forUpdate().findList(); + Junction queryJunction = _server .find(EbeanAspectV2.class) @@ -811,21 +844,11 @@ public Map> getNextVersions( } } - Map> result = new HashMap<>(); - // Default next version 0 - urnAspects.forEach( - (key, value) -> { - Map defaultNextVersion = new HashMap<>(); - value.forEach(aspectName -> defaultNextVersion.put(aspectName, 0L)); - result.put(key, defaultNextVersion); - }); - if (exp == null) { return result; } - // forUpdate is required to avoid duplicate key violations - List dbResults = exp.endOr().forUpdate().findIds(); + List dbResults = exp.endOr().findIds(); for (EbeanAspectV2.PrimaryKey key : dbResults) { if (result.get(key.getUrn()).get(key.getAspect()) <= key.getVersion()) { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectV1.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectV1.java index 648b7cd6a65b0..844bc7797255c 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectV1.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectV1.java @@ -2,29 +2,30 @@ import io.ebean.Model; import io.ebean.annotation.Index; +import jakarta.persistence.Column; +import jakarta.persistence.Embeddable; +import jakarta.persistence.EmbeddedId; +import jakarta.persistence.Entity; +import jakarta.persistence.Lob; +import jakarta.persistence.Table; +import java.io.Serializable; import java.sql.Timestamp; -import javax.persistence.Column; -import javax.persistence.Embeddable; -import javax.persistence.EmbeddedId; -import javax.persistence.Entity; -import javax.persistence.Lob; -import javax.persistence.Table; +import javax.annotation.Nonnull; import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NoArgsConstructor; -import lombok.NonNull; import lombok.Setter; /** Schema definition for the legacy aspect table. */ @Getter @Setter +@NoArgsConstructor +@AllArgsConstructor @Entity @Table(name = "metadata_aspect") public class EbeanAspectV1 extends Model { - private static final long serialVersionUID = 1L; - public static final String ALL_COLUMNS = "*"; public static final String KEY_ID = "key"; public static final String URN_COLUMN = "urn"; @@ -41,16 +42,16 @@ public class EbeanAspectV1 extends Model { @AllArgsConstructor @NoArgsConstructor @EqualsAndHashCode - public static class PrimaryKey { + public static class PrimaryKey implements Serializable { private static final long serialVersionUID = 1L; - @NonNull + @Nonnull @Index @Column(name = URN_COLUMN, length = 500, nullable = false) private String urn; - @NonNull + @Nonnull @Index @Column(name = ASPECT_COLUMN, length = 200, nullable = false) private String aspect; @@ -60,18 +61,18 @@ public static class PrimaryKey { private long version; } - @NonNull @EmbeddedId @Index protected PrimaryKey key; + @Nonnull @EmbeddedId @Index protected PrimaryKey key; - @NonNull + @Nonnull @Lob @Column(name = METADATA_COLUMN, nullable = false) protected String metadata; - @NonNull + @Nonnull @Column(name = CREATED_ON_COLUMN, nullable = false) private Timestamp createdOn; - @NonNull + @Nonnull @Column(name = CREATED_BY_COLUMN, nullable = false) private String createdBy; diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectV2.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectV2.java index 71e52ed403b9b..407a47a91a454 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectV2.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectV2.java @@ -4,19 +4,19 @@ import com.linkedin.metadata.entity.EntityAspectIdentifier; import io.ebean.Model; import io.ebean.annotation.Index; +import jakarta.persistence.Column; +import jakarta.persistence.Embeddable; +import jakarta.persistence.EmbeddedId; +import jakarta.persistence.Entity; +import jakarta.persistence.Lob; +import jakarta.persistence.Table; +import java.io.Serializable; import java.sql.Timestamp; import javax.annotation.Nonnull; -import javax.persistence.Column; -import javax.persistence.Embeddable; -import javax.persistence.EmbeddedId; -import javax.persistence.Entity; -import javax.persistence.Lob; -import javax.persistence.Table; import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.NoArgsConstructor; -import lombok.NonNull; import lombok.Setter; /** Schema definition for the new aspect table. */ @@ -28,8 +28,6 @@ @Table(name = "metadata_aspect_v2") public class EbeanAspectV2 extends Model { - private static final long serialVersionUID = 1L; - public static final String ALL_COLUMNS = "*"; public static final String KEY_ID = "key"; public static final String URN_COLUMN = "urn"; @@ -48,16 +46,16 @@ public class EbeanAspectV2 extends Model { @AllArgsConstructor @NoArgsConstructor @EqualsAndHashCode - public static class PrimaryKey { + public static class PrimaryKey implements Serializable { private static final long serialVersionUID = 1L; - @NonNull + @Nonnull @Index @Column(name = URN_COLUMN, length = 500, nullable = false) private String urn; - @NonNull + @Nonnull @Index @Column(name = ASPECT_COLUMN, length = 200, nullable = false) private String aspect; @@ -75,29 +73,29 @@ public EntityAspectIdentifier toAspectIdentifier() { } } - @NonNull @EmbeddedId @Index protected PrimaryKey key; + @Nonnull @EmbeddedId @Index protected PrimaryKey key; - @NonNull + @Nonnull @Column(name = URN_COLUMN, length = 500, nullable = false) private String urn; - @NonNull + @Nonnull @Column(name = ASPECT_COLUMN, length = 200, nullable = false) private String aspect; @Column(name = VERSION_COLUMN, nullable = false) private long version; - @NonNull + @Nonnull @Lob @Column(name = METADATA_COLUMN, nullable = false) protected String metadata; - @NonNull + @Nonnull @Column(name = CREATED_ON_COLUMN, nullable = false) private Timestamp createdOn; - @NonNull + @Nonnull @Column(name = CREATED_BY_COLUMN, nullable = false) private String createdBy; diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/EbeanAspectDaoTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/EbeanAspectDaoTest.java new file mode 100644 index 0000000000000..43123fb9872a0 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/ebean/EbeanAspectDaoTest.java @@ -0,0 +1,72 @@ +package com.linkedin.metadata.entity.ebean; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import com.linkedin.metadata.EbeanTestUtils; +import com.linkedin.metadata.aspect.batch.AspectsBatch; +import com.linkedin.metadata.config.EbeanConfiguration; +import com.linkedin.metadata.entity.EbeanEntityServiceTest; +import io.ebean.Database; +import io.ebean.test.LoggedSql; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class EbeanAspectDaoTest { + + private EbeanAspectDao testDao; + + @BeforeMethod + public void setupTest() { + Database server = EbeanTestUtils.createTestServer(EbeanEntityServiceTest.class.getSimpleName()); + testDao = new EbeanAspectDao(server, EbeanConfiguration.testDefault); + } + + @Test + public void testGetNextVersionForUpdate() { + LoggedSql.start(); + + testDao.runInTransactionWithRetryUnlocked( + (txContext) -> { + testDao.getNextVersions(Map.of("urn:li:corpuser:test", Set.of("status"))); + return ""; + }, + mock(AspectsBatch.class), + 0); + + // Get the captured SQL statements + List sql = + LoggedSql.stop().stream() + .filter(str -> !str.contains("INFORMATION_SCHEMA.TABLES")) + .toList(); + assertEquals(sql.size(), 2, String.format("Found: %s", sql)); + assertTrue( + sql.get(0).contains("for update;"), String.format("Did not find `for update` in %s ", sql)); + } + + @Test + public void testGetLatestAspectsForUpdate() { + LoggedSql.start(); + + testDao.runInTransactionWithRetryUnlocked( + (txContext) -> { + testDao.getLatestAspects(Map.of("urn:li:corpuser:test", Set.of("status")), true); + return ""; + }, + mock(AspectsBatch.class), + 0); + + // Get the captured SQL statements + List sql = + LoggedSql.stop().stream() + .filter(str -> !str.contains("INFORMATION_SCHEMA.TABLES")) + .toList(); + assertEquals(sql.size(), 1, String.format("Found: %s", sql)); + assertTrue( + sql.get(0).contains("for update;"), String.format("Did not find `for update` in %s ", sql)); + } +} From adc6b90363c8861f10bace8aaae878af4a0f7913 Mon Sep 17 00:00:00 2001 From: Alisa Aylward Date: Mon, 16 Sep 2024 21:19:56 -0400 Subject: [PATCH 05/12] fix(ingest/snowflake): Update snowflake_utils.py to account for iceberg tables (#11384) --- .../src/datahub/ingestion/source/snowflake/snowflake_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py index 0177d59ef6b21..5e79530d2391b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py @@ -125,6 +125,7 @@ def is_dataset_pattern_allowed( SnowflakeObjectDomain.EXTERNAL_TABLE, SnowflakeObjectDomain.VIEW, SnowflakeObjectDomain.MATERIALIZED_VIEW, + SnowflakeObjectDomain.ICEBERG_TABLE, ): return False if _is_sys_table(dataset_name): From 38bcd9c38197eb77b4693980bea0d57ffe23bbfe Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 16 Sep 2024 23:11:58 -0700 Subject: [PATCH 06/12] feat(ingest): default to ASYNC_BATCH mode in datahub-rest sink (#11369) --- docs/how/updating-datahub.md | 1 + metadata-ingestion/sink_docs/datahub.md | 47 ++++---- .../src/datahub/emitter/rest_emitter.py | 36 +++++-- .../datahub/ingestion/sink/datahub_rest.py | 12 ++- .../datahub/utilities/partition_executor.py | 101 ++++++++++++++++-- .../tests/unit/test_pipeline.py | 4 +- .../unit/utilities/test_partition_executor.py | 8 ++ 7 files changed, 168 insertions(+), 41 deletions(-) diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index abb6bcd32a554..d8a6e4c6bdca0 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -37,6 +37,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe Re-running with stateful ingestion should automatically clear up the entities with old URNS and add entities with new URNs, therefore not duplicating the containers or jobs. - #11313 - `datahub get` will no longer return a key aspect for entities that don't exist. +- #11369 - The default datahub-rest sink mode has been changed to `ASYNC_BATCH`. This requires a server with version 0.14.0+. ### Potential Downtime diff --git a/metadata-ingestion/sink_docs/datahub.md b/metadata-ingestion/sink_docs/datahub.md index 8ddcf6aff1035..a60356fc3789f 100644 --- a/metadata-ingestion/sink_docs/datahub.md +++ b/metadata-ingestion/sink_docs/datahub.md @@ -29,6 +29,7 @@ sink: ``` If you are connecting to a hosted DataHub Cloud instance, your sink will look like + ```yml source: # source configs @@ -68,16 +69,17 @@ If you are using [UI based ingestion](../../docs/ui-ingestion.md) then where GMS Note that a `.` is used to denote nested fields in the YAML recipe. | Field | Required | Default | Description | -|----------------------------|----------|----------------------|----------------------------------------------------------------------------------------------------| -| `server` | ✅ | | URL of DataHub GMS endpoint. | +| -------------------------- | -------- | -------------------- | -------------------------------------------------------------------------------------------------- | +| `server` | ✅ | | URL of DataHub GMS endpoint. | +| `token` | | | Bearer token used for authentication. | | `timeout_sec` | | 30 | Per-HTTP request timeout. | | `retry_max_times` | | 1 | Maximum times to retry if HTTP request fails. The delay between retries is increased exponentially | | `retry_status_codes` | | [429, 502, 503, 504] | Retry HTTP request also on these status codes | -| `token` | | | Bearer token used for authentication. | | `extra_headers` | | | Extra headers which will be added to the request. | -| `max_threads` | | `15` | Experimental: Max parallelism for REST API calls | -| `ca_certificate_path` | | | Path to server's CA certificate for verification of HTTPS communications | -| `client_certificate_path` | | | Path to client's CA certificate for HTTPS communications | +| `max_threads` | | `15` | Max parallelism for REST API calls | +| `mode` | | `ASYNC_BATCH` | [Advanced] Mode of operation - `SYNC`, `ASYNC`, or `ASYNC_BATCH` | +| `ca_certificate_path` | | | Path to server's CA certificate for verification of HTTPS communications | +| `client_certificate_path` | | | Path to client's CA certificate for HTTPS communications | | `disable_ssl_verification` | | false | Disable ssl certificate validation | ## DataHub Kafka @@ -115,14 +117,14 @@ sink: Note that a `.` is used to denote nested fields in the YAML recipe. -| Field | Required | Default | Description | -| -------------------------------------------- | -------- | ------- | -------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `connection.bootstrap` | ✅ | | Kafka bootstrap URL. | -| `connection.producer_config.