From 7fd0cff08eef8d475759c7492cc9e59b107218b2 Mon Sep 17 00:00:00 2001 From: WiktorMadejski Date: Thu, 1 Feb 2024 04:21:57 +0100 Subject: [PATCH] doc: FileSystemMetricsRepository sync on DBFS tutorial (#187) * FileSystemMetricsRepository sync on DBFS tutorial I need to migrate python-deequ application - I expected this task to take 1-2h initially but It took me a lot more than that. I hope that helps other people, * Add files via upload * Rename repository_sync_dbfs.ipynb to repository_file_dbfs.ipynb * Update repository_file_dbfs.ipynb * Update repository_file_dbfs.ipynb * Update repository_file_dbfs.ipynb * Update repository_file_dbfs.ipynb --- tutorials/repository_file_dbfs.ipynb | 521 +++++++++++++++++++++++++++ 1 file changed, 521 insertions(+) create mode 100644 tutorials/repository_file_dbfs.ipynb diff --git a/tutorials/repository_file_dbfs.ipynb b/tutorials/repository_file_dbfs.ipynb new file mode 100644 index 0000000..cef3b7a --- /dev/null +++ b/tutorials/repository_file_dbfs.ipynb @@ -0,0 +1,521 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "3d9f38cf-e24a-4854-8027-09414874fb35", + "showTitle": false, + "title": "" + } + }, + "source": [ + "####Manage FileSystemMetricsRepository metrics JSON file stored on DBFS between runs.\n", + "\n", + "PyDeeQu allows us to persist the metrics in a so-called MetricsRepository. FileSystemMetricsRepository implements MetricsRepository and allows to materialize repository to JSON file.\n", + "\n", + "The following tutorial is created to demonstrate\n", + "- How to access FileSystemMetricsRepository materialized data - metrics.json (section 1),\n", + "- How to create FileSystemMetricsRepository with managed data (section 2.3),\n", + "- How to run VerificationSuite using specific managed data (all sections).\n", + "\n", + "\n", + "This can be especially usefull:\n", + "- For python-deequ application migration from one Databricks Workspace to another,\n", + "- To manage MetricsRepository json on the application side application,\n", + "- To enable explainability and analytics using MetricsRepository json.\n", + "\n", + "Note: As of 1.1.0 release of Python Deequ release initialising repository json as FileSystemMetricsRepository is the only way to run validations on historical metrics. InMemoryMetricsRepository does not support initialising from historical metrics." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "71833639-88d8-4151-8011-d5d659253e00", + "showTitle": false, + "title": "" + } + }, + "source": [ + "#### 1) Simulate historical Anomaly run" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "2e7debbf-532b-446c-99ef-89ad9c580742", + "showTitle": false, + "title": "" + } + }, + "source": [ + "##### 1.1) Create FileSystemMetricsRepository with autogenerated path\n", + "\n", + "When initializing the FileSystemMetricsRepository without explicitly specifying a path, it will autonomously generate the path." + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "a1b22354-660c-4177-be3a-1e5c18e97be1", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "from pydeequ.repository import FileSystemMetricsRepository\n", + "\n", + "repository = FileSystemMetricsRepository(spark)\n", + "print(repository.path)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "6a6cb69f-1219-4b32-8f44-e5a6d0687ffe", + "showTitle": false, + "title": "" + } + }, + "source": [ + "This path is crucial for materializing json file containing metrics for consecutive PyDeequ runs. PyDeequ, in turn, matches check definitions in the persisted data model and utilizes the underlying metrics to calculate specific anomalies." + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "b50a2fde-1836-4317-a893-c5e25c6f82e4", + "showTitle": false, + "title": "" + } + }, + "source": [ + "##### 1.2) Run anomaly checks" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "3a8cef92-4212-412d-9e99-9efc7c946238", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "from pydeequ.repository import ResultKey\n", + "from pydeequ.verification import VerificationSuite\n", + "from pydeequ.anomaly_detection import RelativeRateOfChangeStrategy\n", + "from pydeequ.analyzers import Mean\n", + "\n", + "COLUMN_NAME = \"age\"\n", + "\n", + "df_xyz = spark.createDataFrame([{COLUMN_NAME:19},{COLUMN_NAME:21}])\n", + "\n", + "verification_suite = (\n", + " VerificationSuite(spark)\n", + " .onData(df_xyz)\n", + " .useRepository(repository)\n", + " .saveOrAppendResult(\n", + " ResultKey(\n", + " spark, \n", + " ResultKey.current_milli_time(),\n", + " {\"tag\": \"historical-run-0\"}\n", + " )\n", + " )\n", + " .addAnomalyCheck(\n", + " RelativeRateOfChangeStrategy(\n", + " maxRateDecrease=0.8,\n", + " maxRateIncrease=1.2\n", + " ), \n", + " Mean(COLUMN_NAME)\n", + " )\n", + " )\n", + "results = verification_suite.run()\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "9ea4b0b4-d455-45b9-8551-aa8278258e42", + "showTitle": false, + "title": "" + } + }, + "source": [ + "##### 1.3) Verify metrics.json is persisted to dbfs\n", + "\n", + "Triggering VerificationSuite.run() will persist metrics to the file underlying FileSystemMetricsRepository." + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "bd3f16f7-ce2e-4146-a230-68ebbb36450a", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "historical_repository_path = repository.path\n", + "with open(f\"/dbfs{historical_repository_path}\", \"r\") as f: \n", + " print(f.read())" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "27326ab8-d587-448d-b576-84fe32b8047d", + "showTitle": false, + "title": "" + } + }, + "source": [ + "#### 2) Create FileSystemMetricsRepository with managed path" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "aee23203-1791-4453-b20a-412320e12642", + "showTitle": false, + "title": "" + } + }, + "source": [ + "##### 2.1) Define target path" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "7437cd9c-6585-486c-a071-165cdbdc615e", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "import os\n", + "\n", + "os.makedirs(\"/dbfs/table_xyz\", exist_ok=True) \n", + "target_metrics_file_path = 'table_xyz/metrics.json'" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "1e9bd69a-4d25-4803-a941-b91eeca447b4", + "showTitle": false, + "title": "" + } + }, + "source": [ + "##### 2.2) Copy historical metrics.json to target path" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "c49738f4-9cda-4383-b637-6dad483b3b24", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "import shutil\n", + "\n", + "shutil.copyfile(\n", + " src = f\"/dbfs{historical_repository_path}\",\n", + " dst = f\"/dbfs/{target_metrics_file_path}\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "66967785-b567-4fc4-bf2b-d6d15c00c14b", + "showTitle": false, + "title": "" + } + }, + "source": [ + "##### 2.3) Initialize FileSystemMetricsRepository with managed path\n", + "Note: Manage repository file using File API format but provide Spark API format to [FileSystemMetricsRepository](https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/repository/fs/FileSystemMetricsRepository.scala)" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "555df047-1c6d-44df-aec0-b3730d8931f0", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "metrics_spark_api =f\"dbfs:/{target_metrics_file_path}\"\n", + "\n", + "repository = FileSystemMetricsRepository(spark, metrics_spark_api)\n", + "print(repository.path)" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "53716975-d2ed-4dc4-9f5f-c0805c2d9f67", + "showTitle": false, + "title": "" + } + }, + "source": [ + "#### 3) Run anomaly checks" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "8d3984da-2324-4a5e-aa07-2d05ead562c7", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "df_xyz = spark.createDataFrame([{COLUMN_NAME:19},{COLUMN_NAME:21},{COLUMN_NAME:50}])\n", + "\n", + "verification_suite = (\n", + " VerificationSuite(spark)\n", + " .onData(df_xyz)\n", + " .useRepository(repository)\n", + " .saveOrAppendResult(\n", + " ResultKey(\n", + " spark, \n", + " ResultKey.current_milli_time()\n", + " )\n", + " )\n", + " .addAnomalyCheck(\n", + " RelativeRateOfChangeStrategy(\n", + " maxRateDecrease=0.8,\n", + " maxRateIncrease=1.2\n", + " ), \n", + " Mean(COLUMN_NAME)\n", + " )\n", + " )\n", + "results = verification_suite.run()\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "222a856f-716c-47bc-be14-1506de64f8d5", + "showTitle": false, + "title": "" + } + }, + "source": [ + "##### 3.1) Validate that historical metrics were taken into consideration when calculating anomaly\n", + "\n", + "New data age mean is 30, while old data age average is 20. RelativeRateOfChangeStrategy should fails as accepted rate of change is +/-20%." + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "9c43827c-68f2-406c-814d-951353a173bd", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "results.checkResultsAsDataFrame(spark_session=spark, verificationResult=results).display()" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "4385b496-0cdf-430e-93d2-46ff6f26bf78", + "showTitle": false, + "title": "" + } + }, + "source": [ + "##### 3.2) Validate that repository json is updated after VerificationSuite run" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, + "inputWidgets": {}, + "nuid": "68d72b50-e7c2-4527-88d0-3ddc8d686e86", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "import json\n", + "with open(f\"/dbfs/{target_metrics_file_path}\", \"r\", encoding=\"utf-8\") as file:\n", + " repository_str = file.read()\n", + "\n", + "json.loads(repository_str)" + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "dashboards": [], + "language": "python", + "notebookMetadata": { + "pythonIndentUnit": 4 + }, + "notebookName": "pydeequ: repository_sync_dbfs", + "widgets": {} + } + }, + "nbformat": 4, + "nbformat_minor": 0 +}