From 405d475c24380b5eb4f4cf60b540d42a834ed30c Mon Sep 17 00:00:00 2001 From: "FAREAST\\yuanjieluo" Date: Tue, 9 Jul 2024 10:46:49 +0800 Subject: [PATCH] Add Databricks community templates --- .../Databricks - Run Serverless Notebook.json | 290 ++++++++++++++++++ .../manifest.json | 36 +++ .../Databricks - Trigger Workflow.json | 288 +++++++++++++++++ .../manifest.json | 37 +++ 4 files changed, 651 insertions(+) create mode 100644 templates/Databricks - Run Serverless Notebook/Databricks - Run Serverless Notebook.json create mode 100644 templates/Databricks - Run Serverless Notebook/manifest.json create mode 100644 templates/Databricks - Trigger Workflow/Databricks - Trigger Workflow.json create mode 100644 templates/Databricks - Trigger Workflow/manifest.json diff --git a/templates/Databricks - Run Serverless Notebook/Databricks - Run Serverless Notebook.json b/templates/Databricks - Run Serverless Notebook/Databricks - Run Serverless Notebook.json new file mode 100644 index 00000000..ba56e995 --- /dev/null +++ b/templates/Databricks - Run Serverless Notebook/Databricks - Run Serverless Notebook.json @@ -0,0 +1,290 @@ +{ + "$schema": "http://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#", + "contentVersion": "1.0.0.0", + "parameters": { + "factoryName": { + "type": "string", + "metadata": "Data Factory name" + } + }, + "variables": { + "factoryId": "[concat('Microsoft.DataFactory/factories/', parameters('factoryName'))]" + }, + "resources": [ + { + "name": "[concat(parameters('factoryName'), '/Databricks - Run Serverless Notebook')]", + "type": "Microsoft.DataFactory/factories/pipelines", + "apiVersion": "2018-06-01", + "properties": { + "description": "This template can be used to submit notebook runs to a Databricks Workspace using serverless jobs compute.", + "activities": [ + { + "name": "Execute Jobs API", + "description": "This task submits the notebook to the run submit API endpoint.", + "type": "WebActivity", + "dependsOn": [], + "policy": { + "timeout": "0.00:10:00", + "retry": 3, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "method": "POST", + "headers": {}, + "url": { + "value": "@concat('https://',pipeline().parameters.DatabricksWorkspaceID,'.azuredatabricks.net/api/2.1/jobs/runs/submit')", + "type": "Expression" + }, + "body": { + "value": "{\n \"run_name\": \"@{pipeline().parameters.DatabricksRunName}\",\n \"tasks\": [\n {\n \"task_key\": \"@{pipeline().parameters.DatabricksRunName}\",\n \"notebook_task\": {\n \"base_parameters\": @{pipeline().parameters.DatabricksNotebookParameters},\n \"notebook_path\": \"@{pipeline().parameters.DatabricksNotebookPath}\",\n \"source\": \"WORKSPACE\"\n }\n }\n ]\n}\n", + "type": "Expression" + }, + "authentication": { + "type": "MSI", + "resource": "2ff814a6-3304-4ab8-85cb-cd0e6f879c1d" + } + } + }, + { + "name": "Wait Until Job Completes", + "type": "Until", + "dependsOn": [ + { + "activity": "Execute Jobs API", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "userProperties": [], + "typeProperties": { + "expression": { + "value": "@not(equals(variables('JobStatus'),'Running'))", + "type": "Expression" + }, + "activities": [ + { + "name": "Check Job Run API", + "type": "WebActivity", + "dependsOn": [], + "policy": { + "timeout": "0.00:10:00", + "retry": 3, + "retryIntervalInSeconds": 60, + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "method": "GET", + "headers": {}, + "url": { + "value": "@concat('https://',pipeline().parameters.DatabricksWorkspaceID,'.azuredatabricks.net/api/2.1/jobs/runs/get?run_id=',activity('Execute Jobs API').output.run_id)", + "type": "Expression" + }, + "body": { + "job_id": 3895 + }, + "authentication": { + "type": "MSI", + "resource": "2ff814a6-3304-4ab8-85cb-cd0e6f879c1d" + } + } + }, + { + "name": "Set Job Status", + "type": "SetVariable", + "dependsOn": [ + { + "activity": "Check Job Run API", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "policy": { + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "variableName": "JobStatus", + "value": { + "value": "@if(\nor(\nequals(activity('Check Job Run API').output.state.life_cycle_state, 'PENDING'), equals(activity('Check Job Run API').output.state.life_cycle_state, 'RUNNING')\n), \n'Running',\nactivity('Check Job Run API').output.state.result_state\n)", + "type": "Expression" + } + } + }, + { + "name": "Wait to Recheck API", + "type": "Wait", + "dependsOn": [ + { + "activity": "Set Job Status", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "userProperties": [], + "typeProperties": { + "waitTimeInSeconds": { + "value": "@pipeline().parameters.ADFWaitSeconds", + "type": "Expression" + } + } + } + ], + "timeout": "7.00:00:00" + } + }, + { + "name": "Check Job Status", + "type": "IfCondition", + "dependsOn": [ + { + "activity": "Wait Until Job Completes", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "userProperties": [], + "typeProperties": { + "expression": { + "value": "@equals(variables('JobStatus'),'SUCCESS')", + "type": "Expression" + }, + "ifFalseActivities": [ + { + "name": "Databricks Job Fail", + "description": "The Databricks job has failed.", + "type": "Fail", + "dependsOn": [], + "userProperties": [], + "typeProperties": { + "message": { + "value": "@concat('Job Run URL: ', activity('Check Job Run API').output.run_page_url)", + "type": "Expression" + }, + "errorCode": "Databricks Error" + } + } + ] + } + }, + { + "name": "Get Job Run URL", + "description": "Get's the job run URL after the run has been submitted.", + "type": "WebActivity", + "dependsOn": [ + { + "activity": "Execute Jobs API", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "policy": { + "timeout": "0.00:10:00", + "retry": 3, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "method": "GET", + "headers": {}, + "url": { + "value": "@concat('https://',pipeline().parameters.DatabricksWorkspaceID,'.azuredatabricks.net/api/2.1/jobs/runs/get?run_id=',activity('Execute Jobs API').output.run_id)", + "type": "Expression" + }, + "body": { + "job_id": 3895 + }, + "authentication": { + "type": "MSI", + "resource": "2ff814a6-3304-4ab8-85cb-cd0e6f879c1d" + } + } + }, + { + "name": "Set Run Page URL", + "type": "SetVariable", + "dependsOn": [ + { + "activity": "Get Job Run URL", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "policy": { + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "variableName": "pipelineReturnValue", + "value": [ + { + "key": "jobRunURL", + "value": { + "type": "Expression", + "content": "@activity('Get Job Run URL').output.run_page_url" + } + } + ], + "setSystemVariable": true + } + } + ], + "policy": { + "elapsedTimeMetric": {} + }, + "parameters": { + "DatabricksNotebookPath": { + "type": "string", + "defaultValue": "/Workspace/Shared/guanjie_shared/test" + }, + "DatabricksWorkspaceID": { + "type": "string", + "defaultValue": "adb-193752524297111.11" + }, + "ADFWaitSeconds": { + "type": "int", + "defaultValue": 30 + }, + "DatabricksNotebookParameters": { + "type": "string", + "defaultValue": { + "Example_ETL_PARAM": "99" + } + }, + "DatabricksRunName": { + "type": "string", + "defaultValue": "ADFServerlessJob" + } + }, + "variables": { + "JobStatus": { + "type": "String", + "defaultValue": "Running" + } + }, + "folder": { + "name": "Custom Databricks Activities" + }, + "annotations": [ + "Databricks" + ], + "lastPublishTime": "2024-06-13T04:49:32Z" + }, + "dependsOn": [] + } + ] +} \ No newline at end of file diff --git a/templates/Databricks - Run Serverless Notebook/manifest.json b/templates/Databricks - Run Serverless Notebook/manifest.json new file mode 100644 index 00000000..454ac5ef --- /dev/null +++ b/templates/Databricks - Run Serverless Notebook/manifest.json @@ -0,0 +1,36 @@ +{ + "name": "Databricks - Run Serverless Notebook", + "description": "This template can be used to submit Databricks notebook runs to a Databricks Workspace using serverless jobs compute.\n\nNote: This template uses the ADF MSI to authenticate to Azure Databricks.", + "image": "WebExecute Jobs APIUntilWait Until JobCompletesActivitiesCheck JobRun APISet JobStatusWait toRecheck...+If ConditionCheck Job StatusTrue+FalseDatabricksJob Fail+WebGet Job Run URLSet variableSet Run Page URL", + "icons": [ + "WebActivity", + "Until", + "IfCondition" + ], + "requires": { + "linkedservices": {} + }, + "documentation": "https://medium.com/@gshen92/40230d03e095", + "author": "Guanjie Shen", + "contributorType": "Community", + "annotations": [ + "Databricks", + "Azure Databricks", + "Spark", + "ETL" + ], + "services": [ + "Azure Databricks" + ], + "categories": [ + "Data Movement", + "Integration", + "Web activity", + "Analytics", + "AI Machine Learning", + "Data flow", + "Data Masking", + "Copy", + "Transform" + ] +} \ No newline at end of file diff --git a/templates/Databricks - Trigger Workflow/Databricks - Trigger Workflow.json b/templates/Databricks - Trigger Workflow/Databricks - Trigger Workflow.json new file mode 100644 index 00000000..7a91aff3 --- /dev/null +++ b/templates/Databricks - Trigger Workflow/Databricks - Trigger Workflow.json @@ -0,0 +1,288 @@ +{ + "$schema": "http://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#", + "contentVersion": "1.0.0.0", + "parameters": { + "factoryName": { + "type": "string", + "metadata": "Data Factory name" + } + }, + "variables": { + "factoryId": "[concat('Microsoft.DataFactory/factories/', parameters('factoryName'))]" + }, + "resources": [ + { + "name": "[concat(parameters('factoryName'), '/Databricks - Trigger Workflow')]", + "type": "Microsoft.DataFactory/factories/pipelines", + "apiVersion": "2018-06-01", + "properties": { + "description": "This template is used to trigger the execution of an existing Databricks Workflow in a given workspace.", + "activities": [ + { + "name": "Execute Workflow Run API", + "description": "This task submits the notebook to the run submit API endpoint.", + "type": "WebActivity", + "dependsOn": [], + "policy": { + "timeout": "0.00:10:00", + "retry": 3, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "method": "POST", + "headers": {}, + "url": { + "value": "@concat('https://',pipeline().parameters.DatabricksWorkspaceID,'.azuredatabricks.net/api/2.1/jobs/run-now')", + "type": "Expression" + }, + "body": { + "value": "{\n \"job_id\": \"@{pipeline().parameters.DatabricksJobID}\",\n \"job_parameters\": @{pipeline().parameters.DatabricksJobParameters}\n }\n}\n", + "type": "Expression" + }, + "authentication": { + "type": "MSI", + "resource": "2ff814a6-3304-4ab8-85cb-cd0e6f879c1d" + } + } + }, + { + "name": "Wait Until Job Completes", + "type": "Until", + "dependsOn": [ + { + "activity": "Execute Workflow Run API", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "userProperties": [], + "typeProperties": { + "expression": { + "value": "@not(equals(variables('JobStatus'),'Running'))", + "type": "Expression" + }, + "activities": [ + { + "name": "Check Job Run API", + "type": "WebActivity", + "dependsOn": [], + "policy": { + "timeout": "0.00:10:00", + "retry": 3, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "method": "GET", + "headers": {}, + "url": { + "value": "@concat('https://',pipeline().parameters.DatabricksWorkspaceID,'.azuredatabricks.net/api/2.1/jobs/runs/get?run_id=',activity('Execute Workflow Run API').output.run_id)", + "type": "Expression" + }, + "body": { + "job_id": 3895 + }, + "authentication": { + "type": "MSI", + "resource": "2ff814a6-3304-4ab8-85cb-cd0e6f879c1d" + } + } + }, + { + "name": "Set Job Status", + "type": "SetVariable", + "dependsOn": [ + { + "activity": "Check Job Run API", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "policy": { + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "variableName": "JobStatus", + "value": { + "value": "@if(\nor(\nequals(activity('Check Job Run API').output.state.life_cycle_state, 'PENDING'), equals(activity('Check Job Run API').output.state.life_cycle_state, 'RUNNING')\n), \n'Running',\nactivity('Check Job Run API').output.state.result_state\n)", + "type": "Expression" + } + } + }, + { + "name": "Wait to Recheck API", + "type": "Wait", + "dependsOn": [ + { + "activity": "Set Job Status", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "userProperties": [], + "typeProperties": { + "waitTimeInSeconds": { + "value": "@pipeline().parameters.ADFWaitSeconds", + "type": "Expression" + } + } + } + ], + "timeout": "7.00:00:00" + } + }, + { + "name": "Check Job Status", + "description": "Check the status of the run.", + "type": "IfCondition", + "dependsOn": [ + { + "activity": "Wait Until Job Completes", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "userProperties": [], + "typeProperties": { + "expression": { + "value": "@equals(variables('JobStatus'),'SUCCESS')", + "type": "Expression" + }, + "ifFalseActivities": [ + { + "name": "Databricks Job Fail", + "description": "The Databricks job has failed.", + "type": "Fail", + "dependsOn": [], + "userProperties": [], + "typeProperties": { + "message": { + "value": "@concat('Job Run URL: ', activity('Check Job Run API').output.run_page_url)", + "type": "Expression" + }, + "errorCode": "Databricks Error" + } + } + ] + } + }, + { + "name": "Get Job Run URL", + "description": "This get the Job Run ID URL after the run has been submitted.", + "type": "WebActivity", + "dependsOn": [ + { + "activity": "Execute Workflow Run API", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "policy": { + "timeout": "0.00:10:00", + "retry": 3, + "retryIntervalInSeconds": 30, + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "method": "GET", + "headers": {}, + "url": { + "value": "@concat('https://',pipeline().parameters.DatabricksWorkspaceID,'.azuredatabricks.net/api/2.1/jobs/runs/get?run_id=',activity('Execute Workflow Run API').output.run_id)", + "type": "Expression" + }, + "body": { + "job_id": 3895 + }, + "authentication": { + "type": "MSI", + "resource": "2ff814a6-3304-4ab8-85cb-cd0e6f879c1d" + } + } + }, + { + "name": "Set Run Page URL", + "description": "Set the URL for the Run Page.", + "type": "SetVariable", + "dependsOn": [ + { + "activity": "Get Job Run URL", + "dependencyConditions": [ + "Succeeded" + ] + } + ], + "policy": { + "secureOutput": false, + "secureInput": false + }, + "userProperties": [], + "typeProperties": { + "variableName": "pipelineReturnValue", + "value": [ + { + "key": "jobRunURL", + "value": { + "type": "Expression", + "content": "@activity('Get Job Run URL').output.run_page_url" + } + } + ], + "setSystemVariable": true + } + } + ], + "policy": { + "elapsedTimeMetric": {} + }, + "parameters": { + "DatabricksWorkspaceID": { + "type": "string", + "defaultValue": "adb-193752524297111.11" + }, + "ADFWaitSeconds": { + "type": "int", + "defaultValue": 30 + }, + "DatabricksJobParameters": { + "type": "string", + "defaultValue": { + "Example_ETL_PARAM": "99" + } + }, + "DatabricksJobID": { + "type": "string", + "defaultValue": "68933248857869" + } + }, + "variables": { + "JobStatus": { + "type": "String", + "defaultValue": "Running" + } + }, + "folder": { + "name": "Custom Databricks Activities" + }, + "annotations": [ + "Databricks" + ], + "lastPublishTime": "2024-06-13T04:53:14Z" + }, + "dependsOn": [] + } + ] +} \ No newline at end of file diff --git a/templates/Databricks - Trigger Workflow/manifest.json b/templates/Databricks - Trigger Workflow/manifest.json new file mode 100644 index 00000000..db5dcc8d --- /dev/null +++ b/templates/Databricks - Trigger Workflow/manifest.json @@ -0,0 +1,37 @@ +{ + "name": "Databricks - Trigger Workflow", + "description": "This template is used to trigger the execution of an existing Databricks Workflow through Azure Data Factory.\n\nNote: This template uses the ADF MSI to authenticate to Azure Databricks.", + "image": "WebExecute WorkflowRun APIUntilWait Until JobCompletesActivitiesCheck JobRun APISet JobStatusWait toRecheck...+If ConditionCheck Job StatusTrue+FalseDatabricksJob Fail+WebGet Job Run URLSet variableSet Run Page URL", + "icons": [ + "WebActivity", + "Until", + "IfCondition" + ], + "requires": { + "linkedservices": {} + }, + "author": "Guanjie Shen", + "documentation": "https://medium.com/@gshen92/40230d03e095", + "author": "Guanjie Shen", + "contributorType": "Community", + "annotations": [ + "Databricks", + "Azure Databricks", + "Spark", + "ETL" + ], + "services": [ + "Azure Databricks" + ], + "categories": [ + "Data Movement", + "Integration", + "Web activity", + "Analytics", + "AI Machine Learning", + "Data flow", + "Data Masking", + "Copy", + "Transform" + ] +} \ No newline at end of file