From ed1d35c79bb41d06ce646ef44bf7ad810fd229b6 Mon Sep 17 00:00:00 2001
From: Harshal Sheth
-- **Raise incident**: Automatically raise a new DataHub Incident for the Table whenever the Custom Assertion is failing. This +- **Raise incident**: Automatically raise a new DataHub Incident for the Table whenever the Custom SQL Assertion is failing. This may indicate that the Table is unfit for consumption. Configure Slack Notifications under **Settings** to be notified when an incident is created due to an Assertion failure. -- **Resolve incident**: Automatically resolved any incidents that were raised due to failures in this Custom Assertion. Note that +- **Resolve incident**: Automatically resolved any incidents that were raised due to failures in this Custom SQL Assertion. Note that any other incidents will not be impacted. 1. Click **Save**. -And that's it! DataHub will now begin to monitor your Custom Assertion for the table. +And that's it! DataHub will now begin to monitor your Custom SQL Assertion for the table. -To view the time of the next Custom Assertion evaluation, simply click **Custom** and then click on your +To view the time of the next Custom SQL Assertion evaluation, simply click **Custom** and then click on your new Assertion:
@@ -198,12 +198,12 @@ Once your assertion has run, you will begin to see Success or Failure status for
-## Stopping a Custom Assertion +## Stopping a Custom SQL Assertion -In order to temporarily stop the evaluation of a Custom Assertion: +In order to temporarily stop the evaluation of a Custom SQL Assertion: 1. Navigate to the **Validations** tab of the Table with the assertion -2. Click **Custom** to open the Custom Assertions list +2. Click **Custom** to open the Custom SQL Assertions list 3. Click the three-dot menu on the right side of the assertion you want to disable 4. Click **Stop** @@ -211,16 +211,16 @@ In order to temporarily stop the evaluation of a Custom Assertion: -To resume the Custom Assertion, simply click **Turn On**. +To resume the Custom SQL Assertion, simply click **Turn On**.-## Creating Custom Assertions via API +## Creating Custom SQL Assertions via API -Under the hood, Acryl DataHub implements Custom Assertion Monitoring using two "entity" concepts: +Under the hood, Acryl DataHub implements Custom SQL Assertion Monitoring using two "entity" concepts: - **Assertion**: The specific expectation for the custom assertion, e.g. "The table was changed in the past 7 hours" or "The table is changed on a schedule of every day by 8am". This is the "what". @@ -233,15 +233,15 @@ Note that to create or delete Assertions and Monitors for a specific entity on D #### GraphQL -In order to create a Custom Assertion that is being monitored on a specific **Evaluation Schedule**, you'll need to use 2 -GraphQL mutation queries to create a Custom Assertion entity and create an Assertion Monitor entity responsible for evaluating it. +In order to create a Custom SQL Assertion that is being monitored on a specific **Evaluation Schedule**, you'll need to use 2 +GraphQL mutation queries to create a Custom SQL Assertion entity and create an Assertion Monitor entity responsible for evaluating it. -Start by creating the Custom Assertion entity using the `createSqlAssertion` query and hang on to the 'urn' field of the Assertion entity +Start by creating the Custom SQL Assertion entity using the `createSqlAssertion` query and hang on to the 'urn' field of the Assertion entity you get back. Then continue by creating a Monitor entity using the `createAssertionMonitor`. ##### Examples -To create a Custom Assertion Entity that checks whether a query result is greater than 100: +To create a Custom SQL Assertion Entity that checks whether a query result is greater than 100: ```json mutation createSqlAssertion { @@ -265,7 +265,7 @@ mutation createSqlAssertion { } ``` -The supported custom assertion types are `METRIC` and `METRIC_CHANGE`. If you choose `METRIC_CHANGE`, +The supported assertion types are `METRIC` and `METRIC_CHANGE`. If you choose `METRIC_CHANGE`, you will need to provide a `changeType` parameter with either `ABSOLUTE` or `PERCENTAGE` values. The supported operator types are `EQUAL_TO`, `NOT_EQUAL_TO`, `GREATER_THAN`, `GREATER_THAN_OR_EQUAL_TO`, `LESS_THAN`, `LESS_THAN_OR_EQUAL_TO`, and `BETWEEN` (requires minValue, maxValue). The supported parameter types are `NUMBER`. From 54ec12a866868406e83464b21bab5147d11bc5fa Mon Sep 17 00:00:00 2001 From: Zachary McNellis
+ +
+ +:::note +If you do not see the Ingestion tab, please get in touch with your DataHub admin to grant you the correct permissions. +::: + +Navigate to the **Secrets** tab and click **Create new secret**. + ++ +
+ +First, create a secret for the **Client Id**. The value should be the **Client Id** of the API key created in the [prior step](http://localhost:3000/docs/next/quick-ingestion-guides/looker/setup#create-an-api-key). + ++ +
+ +Then, create a secret for the **Client Secret**. The value should be the **Client Secret** of the API key created in the [prior step](http://localhost:3000/docs/next/quick-ingestion-guides/looker/setup#create-an-api-key). + ++ +
+ + +## Configure Looker Ingestion + +### Configure Recipe + +Navigate to the **Sources** tab and click **Create new source**. + ++ +
+ +Choose `Looker`. + ++ +
+ +Enter the details into the Looker Recipe. + +* **Base URL:** This is your looker instance URL. (i.e. `https://+ +
+ + After completing the recipe, click **Next**. + +### Schedule Execution + +Now, it's time to schedule a recurring ingestion pipeline to extract metadata from your Looker instance regularly. + +Decide how regularly you want this ingestion to run-- day, month, year, hour, minute, etc. Select from the dropdown. + ++ +
+ +Ensure you've configured your correct timezone. + ++ +
+ +Finally, click **Next** when you are done. + +### Finish Up + +Name your ingestion source, then click **Save and Run**. + ++ +
+ +You will now find your new ingestion source running. + ++ +
+ +## Configure LookML Connector + +Now that you have created a DataHub-specific API key and Deploy Key with the relevant access in [the prior step](setup.md), it's time to set up a connection via the DataHub UI. + +### Configure Recipe + +Navigate to the **Sources** tab and click **Create new source**. + ++ +
+ +Choose `LooML`. + ++ +
+ +Enter the details into the Looker Recipe. You need to set a minimum 5 fields in the recipe for this quick ingestion guide: + +* **GitHub Repository:** This is your GitHub repository where LookML models are stored. You can provide the full URL (example: https://gitlab.com/gitlab-org/gitlab) or organization/repo; in this case, the connector assume it is a GitHub repo +* **GitHub Deploy Key:** Copy the content of `looker_datahub_deploy_key` and paste into this filed. +* **Looker Base URL:** This is your looker instance URL. (i.e. https://abc.cloud.looker.com) +* **Looker Client ID:** Use the secret LOOKER_CLIENT_ID with the format `${LOOKER_CLIENT_ID}`. +* **Looker Client Secret:** Use the secret LOOKER_CLIENT_SECRET with the format `${LOOKER_CLIENT_SECRET}`. + +Your recipe should look something like this: + ++ +
+ + +After completing the recipe, click **Next**. + +### Schedule Execution + +Now, it's time to schedule a recurring ingestion pipeline to extract metadata from your Looker instance regularly. + +Decide how regularly you want this ingestion to run-- day, month, year, hour, minute, etc. Select from the dropdown. + ++ +
+ +Ensure you've configured your correct timezone. ++ +
+ +Click **Next** when you are done. + +### Finish Up + +Name your ingestion source, then click **Save and Run**. ++ +
+ +You will now find your new ingestion source running. + ++ +
+ +## Validate Ingestion Runs + +View the latest status of ingestion runs on the Ingestion page. + ++ +
+ +Click the `+` sign to expand the complete list of historical runs and outcomes; click **Details** to see the results of a specific run. + ++ +
+ +From the Ingestion Run Details page, pick **View All** to see which entities were ingested. + ++ +
+ +Pick an entity from the list to manually validate if it contains the detail you expected. + ++ +
+ + +**Congratulations!** You've successfully set up Looker & LookML as an ingestion source for DataHub! + +*Need more help? Join the conversation in [Slack](http://slack.datahubproject.io)!* diff --git a/docs/quick-ingestion-guides/looker/overview.md b/docs/quick-ingestion-guides/looker/overview.md new file mode 100644 index 00000000000000..843d704526bcc4 --- /dev/null +++ b/docs/quick-ingestion-guides/looker/overview.md @@ -0,0 +1,52 @@ +--- +title: Overview +--- +# Looker & LookML Ingestion Guide: Overview + +## What You Will Get Out of This Guide + +This guide will help you set up the Looker & LookML connectors to begin ingesting metadata into DataHub. +Upon completing this guide, you will have a recurring ingestion pipeline to extract metadata from Looker & LookML and load it into DataHub. + +### Looker + +Looker connector will ingest Looker asset types: + +* [Dashboards](https://cloud.google.com/looker/docs/dashboards) +* [Charts](https://cloud.google.com/looker/docs/creating-visualizations) +* [Explores](https://cloud.google.com/looker/docs/reference/param-explore-explore) +* [Schemas](https://developers.looker.com/api/explorer/4.0/methods/Metadata/connection_schemas) +* [Owners of Dashboards](https://cloud.google.com/looker/docs/creating-user-defined-dashboards) + +:::note + +To get complete Looker metadata integration (including Looker views and lineage to the underlying warehouse tables), you must also use the [lookml](https://datahubproject.io/docs/generated/ingestion/sources/looker#module-lookml) connector. + +::: + + +### LookML + +LookMl connector will include the following LookML asset types: + +* [LookML views from model files in a project](https://cloud.google.com/looker/docs/reference/param-view-view) +* [Metadata for dimensions](https://cloud.google.com/looker/docs/reference/param-field-dimension) +* [Metadata for measures](https://cloud.google.com/looker/docs/reference/param-measure-types) +* [Dimension Groups as tag](https://cloud.google.com/looker/docs/reference/param-field-dimension-group) + +:::note + +To get complete Looker metadata integration (including Looker views and lineage to the underlying warehouse tables), you must also use the [looker](https://datahubproject.io/docs/generated/ingestion/sources/looker#module-looker) connector. + +::: + +## Next Steps +Please continue to the [setup guide](setup.md), where we'll describe the prerequisites. + +### Reference + +If you want to ingest metadata from Looker using the DataHub CLI, check out the following resources: +* Learn about CLI Ingestion in the [Introduction to Metadata Ingestion](../../../metadata-ingestion/README.md) +* [Looker Ingestion Source](https://datahubproject.io/docs/generated/ingestion/sources/Looker) + +*Need more help? Join the conversation in [Slack](http://slack.datahubproject.io)!* diff --git a/docs/quick-ingestion-guides/looker/setup.md b/docs/quick-ingestion-guides/looker/setup.md new file mode 100644 index 00000000000000..c08de116895ea5 --- /dev/null +++ b/docs/quick-ingestion-guides/looker/setup.md @@ -0,0 +1,156 @@ +--- +title: Setup +--- + +# Looker & LookML Ingestion Guide: Setup + +## Looker Prerequisites + +To configure ingestion from Looker, you'll first have to ensure you have an API key to access the Looker resources. + +### Login To Looker Instance + +Login to your Looker instance(e.g. `https://+ +
+ ++ +
+ +### Create A New Permission Set + +On **Roles Panel**, click **New Permission Set**. + ++ +
+ +Set a name for the new permission set (e.g., *DataHub Connector Permission Set*) and select the following permissions. + ++ +
+ +### Create A Role + +On the **Roles** Panel, click **New Role**. + ++ +
+ +Set the name for the new role (e.g., *DataHub Extractor*) and set the following fields on this window. + +- Set **Permission Set** to permission set created in previous step (i.e *DataHub Connector Permission Set*) +- Set **Model Set** to `All` + +Finally, click **New Role** at the bottom of the page. + ++ +
+ +### Create A New User + +On the **Admin** Panel, click **Users** to open the users panel. + ++ +
+ +Click **Add Users**. + ++ +
+ +On **Adding a new user**, set details in the following fields. + +- Add user's **Email Addresses**. +- Set **Roles** to the role created in previous step (e.g. *DataHub Extractor*) + +Finally, click **Save**. + ++ +
+ +### Create An API Key + +On the **User** Panel, click on the newly created user. + ++ +
+ +Click **Edit Keys** to open the **API Key** Panel. + ++ +
+ +On the **API Key** Panel, click **New API Key** to generate a new **Client ID** and **Client Secret**. ++ +
+ +## LookML Prerequisites + +Follow the below steps to create the GitHub Deploy Key. + +### Generate a private-public SSH key pair + +```bash + ssh-keygen -t rsa -f looker_datahub_deploy_key +``` + +This will typically generate two files like the one below. +* `looker_datahub_deploy_key` (private key) +* `looker_datahub_deploy_key.pub` (public key) + + +### Add Deploy Key to GitHub Repository + +First, log in to [GitHub](https://github.com). + +Navigate to **GitHub Repository** -> **Settings** -> **Deploy Keys** and add a public key (e.g. `looker_datahub_deploy_key.pub`) as deploy key with read access. + ++ +
+ +Make a note of the private key file. You must paste the file's contents into the GitHub Deploy Key field later while [configuring](./configuration.md) ingestion on the DataHub Portal. + +## Next Steps + +Once you've done all the above steps, it's time to move on to [configuring the actual ingestion source](configuration.md) within DataHub. + +_Need more help? Join the conversation in [Slack](http://slack.datahubproject.io)!_ \ No newline at end of file From dc9141a6f405aba866dc8a31212dce9d9fe3fae3 Mon Sep 17 00:00:00 2001 From: Harshal Sheth#xsL3U*%Ze_%pLSYHN50oc*n2QyL`OY(#kE~NYA2@gV1F+z4|awF
-
+
-
+
-
+
-
+
@#xaH
zrQb0K_)gc(Z$FeRTnMM=b$S#YB$%4c?}fMIU-nKc_I3xT_6}ijB{*+g5ZXXU65ca9
zKep!nD8SUt*kvu@g(h`YKSwnK>XtDP=(A=iv&mP++_P~*>%wwhumjrN_w7E866pC{
zv^EvODrMqH;pdPmH0~23$FN;0HN4OX%3|Ecu3|XwYx^_S);7^5Y;IoKMc&$@VZ%mR
zL>FG_p$(=v$`{E
a#iiIaMOu(^N((<=>!#N8pi6}vZ}BqG!g9yXjjF2
z_~~uNg|*zOT?>UG_|lkL%7FoEaxKD3Xff8rFeh^;=25gPc9-Y9txAUEon;ix^M^iI4;gBhmp1giTc_yO&xq@nOxskoA0Y)yy^>xvMtr6~}hwsF@MsHr)^!Km%{OQ&0
zmaakf51>ykrCzTpf0*kHfP$~fnUEapJ)_>an7qH`^t
?bTzOUuVy4|E$};JVapsJ;!!X0C5^I-etL!
zeKrEPd6o^dO)dWaap1ufz8d!KqFwo~_`m#02DW0R|NJe#Jbmq6hwroMH`}o%*6;wA
zX9H_KaE6+%V0ozJ*7I#L6_ZWo
- Staging: mlflow_staging
- Archived: mlflow_archived
- None: mlflow_none |
diff --git a/metadata-ingestion/docs/sources/mlflow/mlflow_recipe.yml b/metadata-ingestion/docs/sources/mlflow/mlflow_recipe.yml
new file mode 100644
index 00000000000000..e40be543466294
--- /dev/null
+++ b/metadata-ingestion/docs/sources/mlflow/mlflow_recipe.yml
@@ -0,0 +1,8 @@
+source:
+ type: mlflow
+ config:
+ # Coordinates
+ tracking_uri: tracking_uri
+
+sink:
+ # sink configs
diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py
index 80e6950dc5ace5..65deadf16a5b33 100644
--- a/metadata-ingestion/setup.py
+++ b/metadata-ingestion/setup.py
@@ -344,6 +344,7 @@ def get_long_description():
"looker": looker_common,
"lookml": looker_common,
"metabase": {"requests"} | sqllineage_lib,
+ "mlflow": {"mlflow-skinny>=2.3.0"},
"mode": {"requests", "tenacity>=8.0.1"} | sqllineage_lib,
"mongodb": {"pymongo[srv]>=3.11", "packaging"},
"mssql": sql_common | {"sqlalchemy-pytds>=0.3"},
@@ -477,6 +478,7 @@ def get_long_description():
"elasticsearch",
"feast" if sys.version_info >= (3, 8) else None,
"iceberg" if sys.version_info >= (3, 8) else None,
+ "mlflow" if sys.version_info >= (3, 8) else None,
"json-schema",
"ldap",
"looker",
@@ -577,6 +579,7 @@ def get_long_description():
"lookml = datahub.ingestion.source.looker.lookml_source:LookMLSource",
"datahub-lineage-file = datahub.ingestion.source.metadata.lineage:LineageFileSource",
"datahub-business-glossary = datahub.ingestion.source.metadata.business_glossary:BusinessGlossaryFileSource",
+ "mlflow = datahub.ingestion.source.mlflow:MLflowSource",
"mode = datahub.ingestion.source.mode:ModeSource",
"mongodb = datahub.ingestion.source.mongodb:MongoDBSource",
"mssql = datahub.ingestion.source.sql.mssql:SQLServerSource",
diff --git a/metadata-ingestion/src/datahub/ingestion/source/mlflow.py b/metadata-ingestion/src/datahub/ingestion/source/mlflow.py
new file mode 100644
index 00000000000000..0668defe7b0c6c
--- /dev/null
+++ b/metadata-ingestion/src/datahub/ingestion/source/mlflow.py
@@ -0,0 +1,321 @@
+import sys
+
+if sys.version_info < (3, 8):
+ raise ImportError("MLflow is only supported on Python 3.8+")
+
+
+from dataclasses import dataclass
+from typing import Any, Callable, Iterable, Optional, TypeVar, Union
+
+from mlflow import MlflowClient
+from mlflow.entities import Run
+from mlflow.entities.model_registry import ModelVersion, RegisteredModel
+from mlflow.store.entities import PagedList
+from pydantic.fields import Field
+
+import datahub.emitter.mce_builder as builder
+from datahub.configuration.source_common import EnvConfigMixin
+from datahub.emitter.mcp import MetadataChangeProposalWrapper
+from datahub.ingestion.api.common import PipelineContext
+from datahub.ingestion.api.decorators import (
+ SupportStatus,
+ capability,
+ config_class,
+ platform_name,
+ support_status,
+)
+from datahub.ingestion.api.source import Source, SourceCapability, SourceReport
+from datahub.ingestion.api.workunit import MetadataWorkUnit
+from datahub.metadata.schema_classes import (
+ GlobalTagsClass,
+ MLHyperParamClass,
+ MLMetricClass,
+ MLModelGroupPropertiesClass,
+ MLModelPropertiesClass,
+ TagAssociationClass,
+ TagPropertiesClass,
+ VersionTagClass,
+ _Aspect,
+)
+
+T = TypeVar("T")
+
+
+class MLflowConfig(EnvConfigMixin):
+ tracking_uri: Optional[str] = Field(
+ default=None,
+ description="Tracking server URI. If not set, an MLflow default tracking_uri is used (local `mlruns/` directory or `MLFLOW_TRACKING_URI` environment variable)",
+ )
+ registry_uri: Optional[str] = Field(
+ default=None,
+ description="Registry server URI. If not set, an MLflow default registry_uri is used (value of tracking_uri or `MLFLOW_REGISTRY_URI` environment variable)",
+ )
+ model_name_separator: str = Field(
+ default="_",
+ description="A string which separates model name from its version (e.g. model_1 or model-1)",
+ )
+
+
+@dataclass
+class MLflowRegisteredModelStageInfo:
+ name: str
+ description: str
+ color_hex: str
+
+
+@platform_name("MLflow")
+@config_class(MLflowConfig)
+@support_status(SupportStatus.TESTING)
+@capability(
+ SourceCapability.DESCRIPTIONS,
+ "Extract descriptions for MLflow Registered Models and Model Versions",
+)
+@capability(SourceCapability.TAGS, "Extract tags for MLflow Registered Model Stages")
+class MLflowSource(Source):
+ platform = "mlflow"
+ registered_model_stages_info = (
+ MLflowRegisteredModelStageInfo(
+ name="Production",
+ description="Production Stage for an ML model in MLflow Model Registry",
+ color_hex="#308613",
+ ),
+ MLflowRegisteredModelStageInfo(
+ name="Staging",
+ description="Staging Stage for an ML model in MLflow Model Registry",
+ color_hex="#FACB66",
+ ),
+ MLflowRegisteredModelStageInfo(
+ name="Archived",
+ description="Archived Stage for an ML model in MLflow Model Registry",
+ color_hex="#5D7283",
+ ),
+ MLflowRegisteredModelStageInfo(
+ name="None",
+ description="None Stage for an ML model in MLflow Model Registry",
+ color_hex="#F2F4F5",
+ ),
+ )
+
+ def __init__(self, ctx: PipelineContext, config: MLflowConfig):
+ super().__init__(ctx)
+ self.config = config
+ self.report = SourceReport()
+ self.client = MlflowClient(
+ tracking_uri=self.config.tracking_uri,
+ registry_uri=self.config.registry_uri,
+ )
+
+ def get_report(self) -> SourceReport:
+ return self.report
+
+ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
+ yield from self._get_tags_workunits()
+ yield from self._get_ml_model_workunits()
+
+ def _get_tags_workunits(self) -> Iterable[MetadataWorkUnit]:
+ """
+ Create tags for each Stage in MLflow Model Registry.
+ """
+ for stage_info in self.registered_model_stages_info:
+ tag_urn = self._make_stage_tag_urn(stage_info.name)
+ tag_properties = TagPropertiesClass(
+ name=self._make_stage_tag_name(stage_info.name),
+ description=stage_info.description,
+ colorHex=stage_info.color_hex,
+ )
+ wu = self._create_workunit(urn=tag_urn, aspect=tag_properties)
+ yield wu
+
+ def _make_stage_tag_urn(self, stage_name: str) -> str:
+ tag_name = self._make_stage_tag_name(stage_name)
+ tag_urn = builder.make_tag_urn(tag_name)
+ return tag_urn
+
+ def _make_stage_tag_name(self, stage_name: str) -> str:
+ return f"{self.platform}_{stage_name.lower()}"
+
+ def _create_workunit(self, urn: str, aspect: _Aspect) -> MetadataWorkUnit:
+ """
+ Utility to create an MCP workunit.
+ """
+ return MetadataChangeProposalWrapper(
+ entityUrn=urn,
+ aspect=aspect,
+ ).as_workunit()
+
+ def _get_ml_model_workunits(self) -> Iterable[MetadataWorkUnit]:
+ """
+ Traverse each Registered Model in Model Registry and generate a corresponding workunit.
+ """
+ registered_models = self._get_mlflow_registered_models()
+ for registered_model in registered_models:
+ yield self._get_ml_group_workunit(registered_model)
+ model_versions = self._get_mlflow_model_versions(registered_model)
+ for model_version in model_versions:
+ run = self._get_mlflow_run(model_version)
+ yield self._get_ml_model_properties_workunit(
+ registered_model=registered_model,
+ model_version=model_version,
+ run=run,
+ )
+ yield self._get_global_tags_workunit(model_version=model_version)
+
+ def _get_mlflow_registered_models(self) -> Iterable[RegisteredModel]:
+ """
+ Get all Registered Models in MLflow Model Registry.
+ """
+ registered_models: Iterable[
+ RegisteredModel
+ ] = self._traverse_mlflow_search_func(
+ search_func=self.client.search_registered_models,
+ )
+ return registered_models
+
+ @staticmethod
+ def _traverse_mlflow_search_func(
+ search_func: Callable[..., PagedList[T]],
+ **kwargs: Any,
+ ) -> Iterable[T]:
+ """
+ Utility to traverse an MLflow search_* functions which return PagedList.
+ """
+ next_page_token = None
+ while True:
+ paged_list = search_func(page_token=next_page_token, **kwargs)
+ yield from paged_list.to_list()
+ next_page_token = paged_list.token
+ if not next_page_token:
+ return
+
+ def _get_ml_group_workunit(
+ self,
+ registered_model: RegisteredModel,
+ ) -> MetadataWorkUnit:
+ """
+ Generate an MLModelGroup workunit for an MLflow Registered Model.
+ """
+ ml_model_group_urn = self._make_ml_model_group_urn(registered_model)
+ ml_model_group_properties = MLModelGroupPropertiesClass(
+ customProperties=registered_model.tags,
+ description=registered_model.description,
+ createdAt=registered_model.creation_timestamp,
+ )
+ wu = self._create_workunit(
+ urn=ml_model_group_urn,
+ aspect=ml_model_group_properties,
+ )
+ return wu
+
+ def _make_ml_model_group_urn(self, registered_model: RegisteredModel) -> str:
+ urn = builder.make_ml_model_group_urn(
+ platform=self.platform,
+ group_name=registered_model.name,
+ env=self.config.env,
+ )
+ return urn
+
+ def _get_mlflow_model_versions(
+ self,
+ registered_model: RegisteredModel,
+ ) -> Iterable[ModelVersion]:
+ """
+ Get all Model Versions for each Registered Model.
+ """
+ filter_string = f"name = '{registered_model.name}'"
+ model_versions: Iterable[ModelVersion] = self._traverse_mlflow_search_func(
+ search_func=self.client.search_model_versions,
+ filter_string=filter_string,
+ )
+ return model_versions
+
+ def _get_mlflow_run(self, model_version: ModelVersion) -> Union[None, Run]:
+ """
+ Get a Run associated with a Model Version. Some MVs may exist without Run.
+ """
+ if model_version.run_id:
+ run = self.client.get_run(model_version.run_id)
+ return run
+ else:
+ return None
+
+ def _get_ml_model_properties_workunit(
+ self,
+ registered_model: RegisteredModel,
+ model_version: ModelVersion,
+ run: Union[None, Run],
+ ) -> MetadataWorkUnit:
+ """
+ Generate an MLModel workunit for an MLflow Model Version.
+ Every Model Version is a DataHub MLModel entity associated with an MLModelGroup corresponding to a Registered Model.
+ If a model was registered without an associated Run then hyperparams and metrics are not available.
+ """
+ ml_model_group_urn = self._make_ml_model_group_urn(registered_model)
+ ml_model_urn = self._make_ml_model_urn(model_version)
+ if run:
+ hyperparams = [
+ MLHyperParamClass(name=k, value=str(v))
+ for k, v in run.data.params.items()
+ ]
+ training_metrics = [
+ MLMetricClass(name=k, value=str(v)) for k, v in run.data.metrics.items()
+ ]
+ else:
+ hyperparams = None
+ training_metrics = None
+ ml_model_properties = MLModelPropertiesClass(
+ customProperties=model_version.tags,
+ externalUrl=self._make_external_url(model_version),
+ description=model_version.description,
+ date=model_version.creation_timestamp,
+ version=VersionTagClass(versionTag=str(model_version.version)),
+ hyperParams=hyperparams,
+ trainingMetrics=training_metrics,
+ # mlflow tags are dicts, but datahub tags are lists. currently use only keys from mlflow tags
+ tags=list(model_version.tags.keys()),
+ groups=[ml_model_group_urn],
+ )
+ wu = self._create_workunit(urn=ml_model_urn, aspect=ml_model_properties)
+ return wu
+
+ def _make_ml_model_urn(self, model_version: ModelVersion) -> str:
+ urn = builder.make_ml_model_urn(
+ platform=self.platform,
+ model_name=f"{model_version.name}{self.config.model_name_separator}{model_version.version}",
+ env=self.config.env,
+ )
+ return urn
+
+ def _make_external_url(self, model_version: ModelVersion) -> Union[None, str]:
+ """
+ Generate URL for a Model Version to MLflow UI.
+ """
+ base_uri = self.client.tracking_uri
+ if base_uri.startswith("http"):
+ return f"{base_uri.rstrip('/')}/#/models/{model_version.name}/versions/{model_version.version}"
+ else:
+ return None
+
+ def _get_global_tags_workunit(
+ self,
+ model_version: ModelVersion,
+ ) -> MetadataWorkUnit:
+ """
+ Associate a Model Version Stage with a corresponding tag.
+ """
+ global_tags = GlobalTagsClass(
+ tags=[
+ TagAssociationClass(
+ tag=self._make_stage_tag_urn(model_version.current_stage),
+ ),
+ ]
+ )
+ wu = self._create_workunit(
+ urn=self._make_ml_model_urn(model_version),
+ aspect=global_tags,
+ )
+ return wu
+
+ @classmethod
+ def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
+ config = MLflowConfig.parse_obj(config_dict)
+ return cls(ctx, config)
diff --git a/metadata-ingestion/tests/integration/mlflow/mlflow_mcps_golden.json b/metadata-ingestion/tests/integration/mlflow/mlflow_mcps_golden.json
new file mode 100644
index 00000000000000..c70625c74d9983
--- /dev/null
+++ b/metadata-ingestion/tests/integration/mlflow/mlflow_mcps_golden.json
@@ -0,0 +1,238 @@
+[
+{
+ "entityType": "tag",
+ "entityUrn": "urn:li:tag:mlflow_production",
+ "changeType": "UPSERT",
+ "aspectName": "tagProperties",
+ "aspect": {
+ "json": {
+ "name": "mlflow_production",
+ "description": "Production Stage for an ML model in MLflow Model Registry",
+ "colorHex": "#308613"
+ }
+ },
+ "systemMetadata": {
+ "lastObserved": 1615443388097,
+ "runId": "mlflow-source-test"
+ }
+},
+{
+ "entityType": "tag",
+ "entityUrn": "urn:li:tag:mlflow_staging",
+ "changeType": "UPSERT",
+ "aspectName": "tagProperties",
+ "aspect": {
+ "json": {
+ "name": "mlflow_staging",
+ "description": "Staging Stage for an ML model in MLflow Model Registry",
+ "colorHex": "#FACB66"
+ }
+ },
+ "systemMetadata": {
+ "lastObserved": 1615443388097,
+ "runId": "mlflow-source-test"
+ }
+},
+{
+ "entityType": "tag",
+ "entityUrn": "urn:li:tag:mlflow_archived",
+ "changeType": "UPSERT",
+ "aspectName": "tagProperties",
+ "aspect": {
+ "json": {
+ "name": "mlflow_archived",
+ "description": "Archived Stage for an ML model in MLflow Model Registry",
+ "colorHex": "#5D7283"
+ }
+ },
+ "systemMetadata": {
+ "lastObserved": 1615443388097,
+ "runId": "mlflow-source-test"
+ }
+},
+{
+ "entityType": "tag",
+ "entityUrn": "urn:li:tag:mlflow_none",
+ "changeType": "UPSERT",
+ "aspectName": "tagProperties",
+ "aspect": {
+ "json": {
+ "name": "mlflow_none",
+ "description": "None Stage for an ML model in MLflow Model Registry",
+ "colorHex": "#F2F4F5"
+ }
+ },
+ "systemMetadata": {
+ "lastObserved": 1615443388097,
+ "runId": "mlflow-source-test"
+ }
+},
+{
+ "entityType": "mlModelGroup",
+ "entityUrn": "urn:li:mlModelGroup:(urn:li:dataPlatform:mlflow,test-model,PROD)",
+ "changeType": "UPSERT",
+ "aspectName": "mlModelGroupProperties",
+ "aspect": {
+ "json": {
+ "customProperties": {
+ "model_env": "test",
+ "model_id": "1"
+ },
+ "description": "This a test registered model",
+ "createdAt": 1615443388097
+ }
+ },
+ "systemMetadata": {
+ "lastObserved": 1615443388097,
+ "runId": "mlflow-source-test"
+ }
+},
+{
+ "entityType": "mlModel",
+ "entityUrn": "urn:li:mlModel:(urn:li:dataPlatform:mlflow,test-model_1,PROD)",
+ "changeType": "UPSERT",
+ "aspectName": "mlModelProperties",
+ "aspect": {
+ "json": {
+ "customProperties": {
+ "model_version_id": "1"
+ },
+ "date": 1615443388097,
+ "version": {
+ "versionTag": "1"
+ },
+ "hyperParams": [
+ {
+ "name": "p",
+ "value": "1"
+ }
+ ],
+ "trainingMetrics": [
+ {
+ "name": "m",
+ "value": "0.85"
+ }
+ ],
+ "tags": [
+ "model_version_id"
+ ],
+ "groups": [
+ "urn:li:mlModelGroup:(urn:li:dataPlatform:mlflow,test-model,PROD)"
+ ]
+ }
+ },
+ "systemMetadata": {
+ "lastObserved": 1615443388097,
+ "runId": "mlflow-source-test"
+ }
+},
+{
+ "entityType": "mlModel",
+ "entityUrn": "urn:li:mlModel:(urn:li:dataPlatform:mlflow,test-model_1,PROD)",
+ "changeType": "UPSERT",
+ "aspectName": "globalTags",
+ "aspect": {
+ "json": {
+ "tags": [
+ {
+ "tag": "urn:li:tag:mlflow_archived"
+ }
+ ]
+ }
+ },
+ "systemMetadata": {
+ "lastObserved": 1615443388097,
+ "runId": "mlflow-source-test"
+ }
+},
+{
+ "entityType": "mlModel",
+ "entityUrn": "urn:li:mlModel:(urn:li:dataPlatform:mlflow,test-model_1,PROD)",
+ "changeType": "UPSERT",
+ "aspectName": "status",
+ "aspect": {
+ "json": {
+ "removed": false
+ }
+ },
+ "systemMetadata": {
+ "lastObserved": 1615443388097,
+ "runId": "mlflow-source-test"
+ }
+},
+{
+ "entityType": "mlModelGroup",
+ "entityUrn": "urn:li:mlModelGroup:(urn:li:dataPlatform:mlflow,test-model,PROD)",
+ "changeType": "UPSERT",
+ "aspectName": "status",
+ "aspect": {
+ "json": {
+ "removed": false
+ }
+ },
+ "systemMetadata": {
+ "lastObserved": 1615443388097,
+ "runId": "mlflow-source-test"
+ }
+},
+{
+ "entityType": "tag",
+ "entityUrn": "urn:li:tag:mlflow_staging",
+ "changeType": "UPSERT",
+ "aspectName": "status",
+ "aspect": {
+ "json": {
+ "removed": false
+ }
+ },
+ "systemMetadata": {
+ "lastObserved": 1615443388097,
+ "runId": "mlflow-source-test"
+ }
+},
+{
+ "entityType": "tag",
+ "entityUrn": "urn:li:tag:mlflow_archived",
+ "changeType": "UPSERT",
+ "aspectName": "status",
+ "aspect": {
+ "json": {
+ "removed": false
+ }
+ },
+ "systemMetadata": {
+ "lastObserved": 1615443388097,
+ "runId": "mlflow-source-test"
+ }
+},
+{
+ "entityType": "tag",
+ "entityUrn": "urn:li:tag:mlflow_production",
+ "changeType": "UPSERT",
+ "aspectName": "status",
+ "aspect": {
+ "json": {
+ "removed": false
+ }
+ },
+ "systemMetadata": {
+ "lastObserved": 1615443388097,
+ "runId": "mlflow-source-test"
+ }
+},
+{
+ "entityType": "tag",
+ "entityUrn": "urn:li:tag:mlflow_none",
+ "changeType": "UPSERT",
+ "aspectName": "status",
+ "aspect": {
+ "json": {
+ "removed": false
+ }
+ },
+ "systemMetadata": {
+ "lastObserved": 1615443388097,
+ "runId": "mlflow-source-test"
+ }
+}
+]
\ No newline at end of file
diff --git a/metadata-ingestion/tests/integration/mlflow/test_mlflow_source.py b/metadata-ingestion/tests/integration/mlflow/test_mlflow_source.py
new file mode 100644
index 00000000000000..76af666526555a
--- /dev/null
+++ b/metadata-ingestion/tests/integration/mlflow/test_mlflow_source.py
@@ -0,0 +1,104 @@
+import sys
+
+if sys.version_info >= (3, 8):
+ from pathlib import Path
+ from typing import Any, Dict, TypeVar
+
+ import pytest
+ from mlflow import MlflowClient
+
+ from datahub.ingestion.run.pipeline import Pipeline
+ from tests.test_helpers import mce_helpers
+
+ T = TypeVar("T")
+
+ @pytest.fixture
+ def tracking_uri(tmp_path: Path) -> str:
+ return str(tmp_path / "mlruns")
+
+ @pytest.fixture
+ def sink_file_path(tmp_path: Path) -> str:
+ return str(tmp_path / "mlflow_source_mcps.json")
+
+ @pytest.fixture
+ def pipeline_config(tracking_uri: str, sink_file_path: str) -> Dict[str, Any]:
+ source_type = "mlflow"
+ return {
+ "run_id": "mlflow-source-test",
+ "source": {
+ "type": source_type,
+ "config": {
+ "tracking_uri": tracking_uri,
+ },
+ },
+ "sink": {
+ "type": "file",
+ "config": {
+ "filename": sink_file_path,
+ },
+ },
+ }
+
+ @pytest.fixture
+ def generate_mlflow_data(tracking_uri: str) -> None:
+ client = MlflowClient(tracking_uri=tracking_uri)
+ experiment_name = "test-experiment"
+ run_name = "test-run"
+ model_name = "test-model"
+ test_experiment_id = client.create_experiment(experiment_name)
+ test_run = client.create_run(
+ experiment_id=test_experiment_id,
+ run_name=run_name,
+ )
+ client.log_param(
+ run_id=test_run.info.run_id,
+ key="p",
+ value=1,
+ )
+ client.log_metric(
+ run_id=test_run.info.run_id,
+ key="m",
+ value=0.85,
+ )
+ client.create_registered_model(
+ name=model_name,
+ tags=dict(
+ model_id=1,
+ model_env="test",
+ ),
+ description="This a test registered model",
+ )
+ client.create_model_version(
+ name=model_name,
+ source="dummy_dir/dummy_file",
+ run_id=test_run.info.run_id,
+ tags=dict(model_version_id=1),
+ )
+ client.transition_model_version_stage(
+ name=model_name,
+ version="1",
+ stage="Archived",
+ )
+
+ def test_ingestion(
+ pytestconfig,
+ mock_time,
+ sink_file_path,
+ pipeline_config,
+ generate_mlflow_data,
+ ):
+ print(f"MCPs file path: {sink_file_path}")
+ golden_file_path = (
+ pytestconfig.rootpath / "tests/integration/mlflow/mlflow_mcps_golden.json"
+ )
+
+ pipeline = Pipeline.create(pipeline_config)
+ pipeline.run()
+ pipeline.pretty_print_summary()
+ pipeline.raise_from_status()
+
+ mce_helpers.check_golden_file(
+ pytestconfig=pytestconfig,
+ output_path=sink_file_path,
+ golden_path=golden_file_path,
+ )
diff --git a/metadata-ingestion/tests/unit/test_mlflow_source.py b/metadata-ingestion/tests/unit/test_mlflow_source.py
new file mode 100644
index 00000000000000..97b5afd3d6a4ef
--- /dev/null
+++ b/metadata-ingestion/tests/unit/test_mlflow_source.py
@@ -0,0 +1,133 @@
+import sys
+
+if sys.version_info >= (3, 8):
+ import datetime
+ from pathlib import Path
+ from typing import Any, TypeVar, Union
+
+ import pytest
+ from mlflow import MlflowClient
+ from mlflow.entities.model_registry import RegisteredModel
+ from mlflow.entities.model_registry.model_version import ModelVersion
+ from mlflow.store.entities import PagedList
+
+ from datahub.ingestion.api.common import PipelineContext
+ from datahub.ingestion.source.mlflow import MLflowConfig, MLflowSource
+
+ T = TypeVar("T")
+
+ @pytest.fixture
+ def tracking_uri(tmp_path: Path) -> str:
+ return str(tmp_path / "mlruns")
+
+ @pytest.fixture
+ def source(tracking_uri: str) -> MLflowSource:
+ return MLflowSource(
+ ctx=PipelineContext(run_id="mlflow-source-test"),
+ config=MLflowConfig(tracking_uri=tracking_uri),
+ )
+
+ @pytest.fixture
+ def registered_model(source: MLflowSource) -> RegisteredModel:
+ model_name = "abc"
+ return RegisteredModel(name=model_name)
+
+ @pytest.fixture
+ def model_version(
+ source: MLflowSource,
+ registered_model: RegisteredModel,
+ ) -> ModelVersion:
+ version = "1"
+ return ModelVersion(
+ name=registered_model.name,
+ version=version,
+ creation_timestamp=datetime.datetime.now(),
+ )
+
+ def dummy_search_func(page_token: Union[None, str], **kwargs: Any) -> PagedList[T]:
+ dummy_pages = dict(
+ page_1=PagedList(items=["a", "b"], token="page_2"),
+ page_2=PagedList(items=["c", "d"], token="page_3"),
+ page_3=PagedList(items=["e"], token=None),
+ )
+ if page_token is None:
+ page_to_return = dummy_pages["page_1"]
+ else:
+ page_to_return = dummy_pages[page_token]
+ if kwargs.get("case", "") == "upper":
+ page_to_return = PagedList(
+ items=[e.upper() for e in page_to_return.to_list()],
+ token=page_to_return.token,
+ )
+ return page_to_return
+
+ def test_stages(source):
+ mlflow_registered_model_stages = {
+ "Production",
+ "Staging",
+ "Archived",
+ None,
+ }
+ workunits = source._get_tags_workunits()
+ names = [wu.get_metadata()["metadata"].aspect.name for wu in workunits]
+
+ assert len(names) == len(mlflow_registered_model_stages)
+ assert set(names) == {
+ "mlflow_" + str(stage).lower() for stage in mlflow_registered_model_stages
+ }
+
+ def test_config_model_name_separator(source, model_version):
+ name_version_sep = "+"
+ source.config.model_name_separator = name_version_sep
+ expected_model_name = (
+ f"{model_version.name}{name_version_sep}{model_version.version}"
+ )
+ expected_urn = f"urn:li:mlModel:(urn:li:dataPlatform:mlflow,{expected_model_name},{source.config.env})"
+
+ urn = source._make_ml_model_urn(model_version)
+
+ assert urn == expected_urn
+
+ def test_model_without_run(source, registered_model, model_version):
+ run = source._get_mlflow_run(model_version)
+ wu = source._get_ml_model_properties_workunit(
+ registered_model=registered_model,
+ model_version=model_version,
+ run=run,
+ )
+ aspect = wu.get_metadata()["metadata"].aspect
+
+ assert aspect.hyperParams is None
+ assert aspect.trainingMetrics is None
+
+ def test_traverse_mlflow_search_func(source):
+ expected_items = ["a", "b", "c", "d", "e"]
+
+ items = list(source._traverse_mlflow_search_func(dummy_search_func))
+
+ assert items == expected_items
+
+ def test_traverse_mlflow_search_func_with_kwargs(source):
+ expected_items = ["A", "B", "C", "D", "E"]
+
+ items = list(
+ source._traverse_mlflow_search_func(dummy_search_func, case="upper")
+ )
+
+ assert items == expected_items
+
+ def test_make_external_link_local(source, model_version):
+ expected_url = None
+
+ url = source._make_external_url(model_version)
+
+ assert url == expected_url
+
+ def test_make_external_link_remote(source, model_version):
+ tracking_uri_remote = "https://dummy-mlflow-tracking-server.org"
+ source.client = MlflowClient(tracking_uri=tracking_uri_remote)
+ expected_url = f"{tracking_uri_remote}/#/models/{model_version.name}/versions/{model_version.version}"
+
+ url = source._make_external_url(model_version)
+
+ assert url == expected_url
diff --git a/metadata-service/war/src/main/resources/boot/data_platforms.json b/metadata-service/war/src/main/resources/boot/data_platforms.json
index 7a7cec60aa25f0..3d956c5774dedb 100644
--- a/metadata-service/war/src/main/resources/boot/data_platforms.json
+++ b/metadata-service/war/src/main/resources/boot/data_platforms.json
@@ -346,6 +346,16 @@
"logoUrl": "/assets/platforms/sagemakerlogo.png"
}
},
+ {
+ "urn": "urn:li:dataPlatform:mlflow",
+ "aspect": {
+ "datasetNameDelimiter": ".",
+ "name": "mlflow",
+ "displayName": "MLflow",
+ "type": "OTHERS",
+ "logoUrl": "/assets/platforms/mlflowlogo.png"
+ }
+ },
{
"urn": "urn:li:dataPlatform:glue",
"aspect": {
From ca4dc4e3d228e0612a42e7a3e0895573ab38586b Mon Sep 17 00:00:00 2001
From: Pedro Silva
- Built with ❤️ by{" "} - {" "} + Built with ❤️ by {" "} Acryl Data {" "} - and LinkedIn. + and LinkedIn.
Get Started → @@ -51,10 +50,10 @@ const Hero = ({}) => {Run the following command to get started with DataHub.
-Run the following command to get started with DataHub.
+