diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 00000000..67d0b720 Binary files /dev/null and b/.DS_Store differ diff --git a/SamplesV2/.DS_Store b/SamplesV2/.DS_Store index 341707db..0a4180c4 100644 Binary files a/SamplesV2/.DS_Store and b/SamplesV2/.DS_Store differ diff --git a/SamplesV2/ChangeDataCapture/.DS_Store b/SamplesV2/ChangeDataCapture/.DS_Store new file mode 100644 index 00000000..4d95beb5 Binary files /dev/null and b/SamplesV2/ChangeDataCapture/.DS_Store differ diff --git a/SamplesV2/ChangeDataCapture/images/adbcell16.png b/SamplesV2/ChangeDataCapture/images/adbcell16.png old mode 100644 new mode 100755 index 3ec39770..48e1a2f9 Binary files a/SamplesV2/ChangeDataCapture/images/adbcell16.png and b/SamplesV2/ChangeDataCapture/images/adbcell16.png differ diff --git a/SamplesV2/ChangeDataCapture/images/adbcreatecluster.png b/SamplesV2/ChangeDataCapture/images/adbcreatecluster.png old mode 100644 new mode 100755 index 8a5528d3..3062e6cb Binary files a/SamplesV2/ChangeDataCapture/images/adbcreatecluster.png and b/SamplesV2/ChangeDataCapture/images/adbcreatecluster.png differ diff --git a/SamplesV2/ChangeDataCapture/images/adfAllUserinputs.png b/SamplesV2/ChangeDataCapture/images/adfAllUserinputs.png old mode 100644 new mode 100755 index 2ecdc41a..9a865f7d Binary files a/SamplesV2/ChangeDataCapture/images/adfAllUserinputs.png and b/SamplesV2/ChangeDataCapture/images/adfAllUserinputs.png differ diff --git a/SamplesV2/ChangeDataCapture/images/adftemplateUserinputs.png b/SamplesV2/ChangeDataCapture/images/adftemplateUserinputs.png old mode 100644 new mode 100755 index 4ab26941..34515e3a Binary files a/SamplesV2/ChangeDataCapture/images/adftemplateUserinputs.png and b/SamplesV2/ChangeDataCapture/images/adftemplateUserinputs.png differ diff --git a/SamplesV2/ChangeDataCapture/usecases/.DS_Store b/SamplesV2/ChangeDataCapture/usecases/.DS_Store new file mode 100644 index 00000000..e1ba3d68 Binary files /dev/null and b/SamplesV2/ChangeDataCapture/usecases/.DS_Store differ diff --git a/SamplesV2/ChangeDataCapture/usecases/cdc/.DS_Store b/SamplesV2/ChangeDataCapture/usecases/cdc/.DS_Store new file mode 100644 index 00000000..6675fc20 Binary files /dev/null and b/SamplesV2/ChangeDataCapture/usecases/cdc/.DS_Store differ diff --git a/SamplesV2/ChangeDataCapture/usecases/cdc/code/.DS_Store b/SamplesV2/ChangeDataCapture/usecases/cdc/code/.DS_Store new file mode 100644 index 00000000..ee29b784 Binary files /dev/null and b/SamplesV2/ChangeDataCapture/usecases/cdc/code/.DS_Store differ diff --git a/SamplesV2/ChangeDataCapture/usecases/cdc/code/notebooks/autoloadersp.ipynb b/SamplesV2/ChangeDataCapture/usecases/cdc/code/notebooks/autoloadersp.ipynb index eed0aa01..664eebd0 100644 --- a/SamplesV2/ChangeDataCapture/usecases/cdc/code/notebooks/autoloadersp.ipynb +++ b/SamplesV2/ChangeDataCapture/usecases/cdc/code/notebooks/autoloadersp.ipynb @@ -1 +1 @@ -{"cells":[{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"e4c95154-bb9a-4815-a33a-db28d83a37c3","showTitle":true,"title":"Define the variables used for creating connection stringsb"}},"outputs":[{"data":{"text/html":["\n","
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["adlsAccountName = \"\"\n","sourceAdlsContainerName = \"bronze\"\n","sinkAdlsContainerName = \"silver\"\n","sourceAdlsFolderName = \"CDC/Sales/Microsoft/AdventureWorksLT/SalesLT/Address\"\n","sinkAdlsFolderName = \"CDC/Sales/Microsoft/AdventureWorksLT/SalesLT/Address\""]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"c413f414-4370-4ec7-bf74-d725259a8f55","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["\n","
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["spark.conf.set(\n"," \"fs.azure.account.key.\" + adlsAccountName + \".dfs.core.windows.net\",\n"," dbutils.secrets.get(scope=\"\",key=\"Adls2-KeySecret\"))"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"98676d8b-3241-45a2-9a85-e6d8e71ef65c","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["\n","
Out[3]: [FileInfo(path='abfss://bronze@dataccelerr267cb5wtgfxg.dfs.core.windows.net/CAAPP/', name='CAAPP/', size=0)]
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
Out[3]: [FileInfo(path='abfss://bronze@dataccelerr267cb5wtgfxg.dfs.core.windows.net/CAAPP/', name='CAAPP/', size=0)]
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["dbutils.fs.ls(\"abfss://\" + sourceAdlsContainerName + \"@\" + adlsAccountName + \".dfs.core.windows.net/\")"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"b53d3657-2004-4a6f-807f-8e1d639b858a","showTitle":true,"title":"Get Secrets"}},"outputs":[{"data":{"text/html":["\n","
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["SubscriptionID = dbutils.secrets.get(\"\",\"SubscriptionID\")\n","DirectoryID = dbutils.secrets.get(\"\",\"DirectoryID\")\n","ServicePrincipalAppID = dbutils.secrets.get(\"\",\"ServicePrincipalAppID\")\n","ServicePrincipalSecret = dbutils.secrets.get(\"\",\"AppSecret\")\n","ResourceGroup = dbutils.secrets.get(\"\",\"ResourceGroup\")\n","BlobConnectionKey = dbutils.secrets.get(\"\",\"Adls2-KeySecret\")"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"1c20b8e8-085c-4fd8-91ea-c415150af7e7","showTitle":true,"title":"Configure authentication for mounting"}},"outputs":[{"data":{"text/html":["\n","
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["configs = {\"fs.azure.account.auth.type\": \"OAuth\",\n"," \"fs.azure.account.oauth.provider.type\": \"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider\",\n"," \"fs.azure.account.oauth2.client.id\": ServicePrincipalAppID,\n"," \"fs.azure.account.oauth2.client.secret\": ServicePrincipalSecret,\n"," \"fs.azure.account.oauth2.client.endpoint\": \"https://login.microsoftonline.com/\"+DirectoryID+\"/oauth2/token\"}"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"9a21418c-bfa6-4c8b-a975-32d80f92ca25","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["\n","
/mnt/source has been unmounted.\n","/mnt/sink has been unmounted.\n","Out[6]: True
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
/mnt/source has been unmounted.\n/mnt/sink has been unmounted.\nOut[6]: True
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["dbutils.fs.unmount(\"/mnt/source\")\n","dbutils.fs.unmount(\"/mnt/sink\") "]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"4d1cb073-3e54-4582-9efe-9bee659e15ff","showTitle":true,"title":"Mount Source and Sink filesystem"}},"outputs":[{"data":{"text/html":["\n","
Out[7]: True
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
Out[7]: True
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["dbutils.fs.mount(\n"," source = \"abfss://\"+sourceAdlsContainerName+\"@\"+adlsAccountName+\".dfs.core.windows.net/\",\n"," mount_point = \"/mnt/source\",\n"," extra_configs = configs)\n","\n","\n","dbutils.fs.mount(\n"," source = \"abfss://\"+sinkAdlsContainerName+\"@\"+adlsAccountName+\".dfs.core.windows.net/\",\n"," mount_point = \"/mnt/sink\",\n"," extra_configs = configs)"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"8d8df1d4-d435-4d65-bc58-dfb177cf4b22","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["\n","
Out[8]: [FileInfo(path='dbfs:/mnt/sink/', name='sink/', size=0),\n"," FileInfo(path='dbfs:/mnt/source/', name='source/', size=0)]
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
Out[8]: [FileInfo(path='dbfs:/mnt/sink/', name='sink/', size=0),\n FileInfo(path='dbfs:/mnt/source/', name='source/', size=0)]
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["dbutils.fs.ls (\"/mnt/\")"]},{"cell_type":"markdown","metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"fe8d3625-630f-4f68-ae34-39f04feb53c3","showTitle":false,"title":""}},"source":["##Intialize AutoLoader"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"ae93452a-d015-4c82-9dcf-dd6989a9fe41","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["\n","
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["from pyspark.sql.types import *\n","import json\n","\n","jschema = '{\"fields\":[{\"metadata\":{},\"name\":\"AddressID\",\"nullable\":true,\"type\":\"integer\"},{\"metadata\":{},\"name\":\"AddressLine1\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"AddressLine2\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"City\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"StateProvince\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"CountryRegion\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"PostalCode\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"rowguid\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"ModifiedDate\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"}'\n","\n","schema = StructType.fromJson(json.loads(jschema))"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"bc877dbf-e0c1-4513-b620-743ca55e2f1b","showTitle":true,"title":"Build cloudFiles Config"}},"outputs":[{"data":{"text/html":["\n","
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["cloudfile = {\n"," \"cloudFiles.subscriptionId\":SubscriptionID,\n"," \"cloudFiles.format\":\"csv\",\n"," \"cloudFiles.tenantId\":DirectoryID,\n"," \"cloudFiles.clientId\":ServicePrincipalAppID,\n"," \"cloudFiles.clientSecret\":ServicePrincipalSecret,\n"," \"cloudFiles.resourceGroup\":ResourceGroup,\n"," \"cloudFiles.useNotifications\": \"true\", \n","}"]},{"cell_type":"markdown","metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"33870c3d-ccc8-4d59-9ba2-f5544fae8162","showTitle":false,"title":""}},"source":["##Build Streaming Dataframe"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"2c776687-6e12-4754-912c-eb72a477675f","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["\n","
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["filePath = \"/mnt/source/\"+sourceAdlsFolderName+\"/\"\n","df = (spark\n"," .readStream\n"," .format(\"cloudFiles\")\n"," .schema(schema)\n"," .options(**cloudfile)\n"," .option(\"Header\",True)\n"," .option(\"cloudFiles.schemaLocation\",\"/mnt/source/\"+sourceAdlsFolderName+\"/_schema\") \n"," .load(filePath))"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"19be7af0-05ea-4a40-9c3b-0f4e25818cb5","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["
filecount(1)
/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-27T03:48:44.487.csv450
/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-28T20:48:53.08.csv451
/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-27T01:30:41.2.csv450
/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-27T03:36:43.23.csv451
/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-28T20:42:08.65.csv451
/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-29T00:39:23.16.csv451
/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-28T20:00:32.207.csv450
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"aggData":[],"aggError":"","aggOverflow":false,"aggSchema":[],"aggSeriesLimitReached":false,"aggType":"","arguments":{},"columnCustomDisplayInfos":{},"data":[["/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-27T03:48:44.487.csv",450],["/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-28T20:48:53.08.csv",451],["/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-27T01:30:41.2.csv",450],["/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-27T03:36:43.23.csv",451],["/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-28T20:42:08.65.csv",451],["/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-29T00:39:23.16.csv",451],["/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-28T20:00:32.207.csv",450]],"datasetInfos":[],"dbfsResultPath":null,"isJsonSchema":true,"metadata":{},"overflow":false,"plotOptions":{"customPlotOptions":{},"displayType":"table","pivotAggregation":null,"pivotColumns":null,"xColumns":null,"yColumns":null},"removedWidgets":[],"schema":[{"metadata":"{}","name":"file","type":"\"string\""},{"metadata":"{}","name":"count(1)","type":"\"long\""}],"type":"table"}},"output_type":"display_data"}],"source":["from pyspark.sql.functions import input_file_name, count\n","filesdf = (df\n"," .withColumn(\"file\",input_file_name())\n"," .groupBy(\"file\")\n"," .agg(count(\"*\"))\n"," )\n","display(filesdf)"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"39d8feef-b048-44ab-aa1c-41ed4e55ca8b","showTitle":true,"title":"upsertToDelta"}},"outputs":[{"data":{"text/html":["\n","
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["from delta.tables import *\n","def upsertToDelta(microBatchOutputDF, batchId):\n"," \n"," deltadf = DeltaTable.forName(spark,\"saleslt.address\")\n"," \n"," (deltadf.alias(\"t\")\n"," .merge(\n"," microBatchOutputDF.alias(\"s\"),\n"," \"s.AddressID = t.AddressID\")\n"," .whenMatchedUpdateAll()\n"," .whenNotMatchedInsertAll()\n"," .execute()\n"," )"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"7c3e872f-ce5f-43e0-8218-e0601e4de264","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["\n","
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["streamQuery = (df.writeStream\n"," .format(\"delta\")\n"," .outputMode(\"append\")\n"," .foreachBatch(upsertToDelta) # Comment this out first time you run\n"," .queryName(\"c-changeLoader-merge\") # Comment this out first time you run\n"," #.trigger(once=True)\n"," .option(\"checkpointLocation\", \"/mnt/sink/\"+sinkAdlsFolderName+\"/AutoLoader/_checkpoint\")\n"," .start(\"/mnt/sink/\"+sinkAdlsFolderName+\"/AutoLoader/data/\")\n",")"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"9120764a-10e9-42f0-be3f-67f564138e50","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["\n","
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["#dfsql = spark.read.format(\"delta\").load(\"/mnt/sink/\"+sinkAdlsFolderName+\"/AutoLoader/data/\")\n","#dfsql.show()"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"afa09d47-f6c0-446c-88d3-30dd69d3a7b5","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["\n","
Out[16]: DataFrame[]
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
Out[16]: DataFrame[]
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["spark.sql(\"CREATE DATABASE IF NOT EXISTS saleslt\")\n","spark.sql(\"CREATE TABLE IF NOT EXISTS saleslt.address USING DELTA LOCATION '/mnt/sink/\"+sinkAdlsFolderName+\"/AutoLoader/data/'\")"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"e3cb2d8e-5228-4a0f-977b-36241b301276","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["
AddressIDAddressLine1AddressLine2CityStateProvinceCountryRegionPostalCoderowguidModifiedDate
86748995 Evergreen Wy.nullEverettWashingtonUnited States982016cd4e374-0ef8-4cee-ac60-9dbf1f4e00072007-09-01 00:00:00.0000000
8712502 Evergreen Ste EnullEverettWashingtonUnited States982013ae9003f-f806-41ad-98a5-3c4a86514bc32005-07-01 00:00:00.0000000
897705 SE Mall ParkwaynullEverettWashingtonUnited States9820177a34652-5d9b-40b6-a15a-56d7ee4056f42006-08-01 00:00:00.0000000
1138513833 55th Drive SEnullEverettWAUSA9820829805efe-60fc-4df9-a299-8c1bff87191a2021-07-28 20:29:02.0000000
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"aggData":[],"aggError":"","aggOverflow":false,"aggSchema":[],"aggSeriesLimitReached":false,"aggType":"","arguments":{},"columnCustomDisplayInfos":{},"data":[[867,"48995 Evergreen Wy.",null,"Everett","Washington","United States","98201","6cd4e374-0ef8-4cee-ac60-9dbf1f4e0007","2007-09-01 00:00:00.0000000"],[871,"2502 Evergreen Ste E",null,"Everett","Washington","United States","98201","3ae9003f-f806-41ad-98a5-3c4a86514bc3","2005-07-01 00:00:00.0000000"],[897,"705 SE Mall Parkway",null,"Everett","Washington","United States","98201","77a34652-5d9b-40b6-a15a-56d7ee4056f4","2006-08-01 00:00:00.0000000"],[11385,"13833 55th Drive SE",null,"Everett","WA","USA","98208","29805efe-60fc-4df9-a299-8c1bff87191a","2021-07-28 20:29:02.0000000"]],"datasetInfos":[],"dbfsResultPath":null,"isJsonSchema":true,"metadata":{},"overflow":false,"plotOptions":{"customPlotOptions":{},"displayType":"table","pivotAggregation":null,"pivotColumns":null,"xColumns":null,"yColumns":null},"removedWidgets":[],"schema":[{"metadata":"{}","name":"AddressID","type":"\"integer\""},{"metadata":"{}","name":"AddressLine1","type":"\"string\""},{"metadata":"{}","name":"AddressLine2","type":"\"string\""},{"metadata":"{}","name":"City","type":"\"string\""},{"metadata":"{}","name":"StateProvince","type":"\"string\""},{"metadata":"{}","name":"CountryRegion","type":"\"string\""},{"metadata":"{}","name":"PostalCode","type":"\"string\""},{"metadata":"{}","name":"rowguid","type":"\"string\""},{"metadata":"{}","name":"ModifiedDate","type":"\"string\""}],"type":"table"}},"output_type":"display_data"}],"source":["%sql\n","\n","SELECT * FROM saleslt.address where City = 'Everett'"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"20aae39e-8a8a-4bb7-a7dd-2e2f8fda5378","showTitle":false,"title":""}},"outputs":[],"source":[]}],"metadata":{"application/vnd.databricks.v1+notebook":{"dashboards":[],"language":"python","notebookMetadata":{"pythonIndentUnit":2},"notebookName":"autoloadersp","notebookOrigID":3796702961398816,"widgets":{}},"language_info":{"name":"python"}},"nbformat":4,"nbformat_minor":0} +{"cells":[{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"e4c95154-bb9a-4815-a33a-db28d83a37c3","showTitle":true,"title":"Define the variables used for creating connection stringsb"}},"outputs":[{"data":{"text/html":["\n","
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["adlsAccountName = \"\"\n","sourceAdlsContainerName = \"bronze\"\n","sinkAdlsContainerName = \"silver\"\n","sourceAdlsFolderName = \"CDC/Sales/Microsoft/AdventureWorksLT/SalesLT/Address\"\n","sinkAdlsFolderName = \"CDC/Sales/Microsoft/AdventureWorksLT/SalesLT/Address\""]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"c413f414-4370-4ec7-bf74-d725259a8f55","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["\n","
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["spark.conf.set(\n"," \"fs.azure.account.key.\" + adlsAccountName + \".dfs.core.windows.net\",\n"," dbutils.secrets.get(scope=\"\",key=\"Adls2-KeySecret\"))"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"98676d8b-3241-45a2-9a85-e6d8e71ef65c","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["\n","
Out[3]: [FileInfo(path='abfss://bronze@dataccelerr267cb5wtgfxg.dfs.core.windows.net/CAAPP/', name='CAAPP/', size=0)]
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
Out[3]: [FileInfo(path='abfss://bronze@dataccelerr267cb5wtgfxg.dfs.core.windows.net/CAAPP/', name='CAAPP/', size=0)]
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["dbutils.fs.ls(\"abfss://\" + sourceAdlsContainerName + \"@\" + adlsAccountName + \".dfs.core.windows.net/\")"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"b53d3657-2004-4a6f-807f-8e1d639b858a","showTitle":true,"title":"Get Secrets"}},"outputs":[{"data":{"text/html":["\n","
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["SubscriptionID = dbutils.secrets.get(\"\",\"SubscriptionID\")\n","DirectoryID = dbutils.secrets.get(\"\",\"DirectoryID\")\n","ServicePrincipalAppID = dbutils.secrets.get(\"\",\"ServicePrincipalAppID\")\n","ServicePrincipalSecret = dbutils.secrets.get(\"\",\"AppSecret\")\n","ResourceGroup = dbutils.secrets.get(\"\",\"ResourceGroup\")\n","BlobConnectionKey = dbutils.secrets.get(\"\",\"Adls2-KeySecret\")"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"1c20b8e8-085c-4fd8-91ea-c415150af7e7","showTitle":true,"title":"Configure authentication for mounting"}},"outputs":[{"data":{"text/html":["\n","
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["configs = {\"fs.azure.account.auth.type\": \"OAuth\",\n"," \"fs.azure.account.oauth.provider.type\": \"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider\",\n"," \"fs.azure.account.oauth2.client.id\": ServicePrincipalAppID,\n"," \"fs.azure.account.oauth2.client.secret\": ServicePrincipalSecret,\n"," \"fs.azure.account.oauth2.client.endpoint\": \"https://login.microsoftonline.com/\"+DirectoryID+\"/oauth2/token\"}"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"9a21418c-bfa6-4c8b-a975-32d80f92ca25","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["\n","
/mnt/source has been unmounted.\n","/mnt/sink has been unmounted.\n","Out[6]: True
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
/mnt/source has been unmounted.\n/mnt/sink has been unmounted.\nOut[6]: True
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["dbutils.fs.unmount(\"/mnt/source\")\n","dbutils.fs.unmount(\"/mnt/sink\") "]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"4d1cb073-3e54-4582-9efe-9bee659e15ff","showTitle":true,"title":"Mount Source and Sink filesystem"}},"outputs":[{"data":{"text/html":["\n","
Out[7]: True
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
Out[7]: True
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["dbutils.fs.mount(\n"," source = \"abfss://\"+sourceAdlsContainerName+\"@\"+adlsAccountName+\".dfs.core.windows.net/\",\n"," mount_point = \"/mnt/source\",\n"," extra_configs = configs)\n","\n","\n","dbutils.fs.mount(\n"," source = \"abfss://\"+sinkAdlsContainerName+\"@\"+adlsAccountName+\".dfs.core.windows.net/\",\n"," mount_point = \"/mnt/sink\",\n"," extra_configs = configs)"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"8d8df1d4-d435-4d65-bc58-dfb177cf4b22","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["\n","
Out[8]: [FileInfo(path='dbfs:/mnt/sink/', name='sink/', size=0),\n"," FileInfo(path='dbfs:/mnt/source/', name='source/', size=0)]
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
Out[8]: [FileInfo(path='dbfs:/mnt/sink/', name='sink/', size=0),\n FileInfo(path='dbfs:/mnt/source/', name='source/', size=0)]
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["dbutils.fs.ls (\"/mnt/\")"]},{"cell_type":"markdown","metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"fe8d3625-630f-4f68-ae34-39f04feb53c3","showTitle":false,"title":""}},"source":["##Intialize AutoLoader"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"ae93452a-d015-4c82-9dcf-dd6989a9fe41","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["\n","
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["from pyspark.sql.types import *\n","import json\n","\n","jschema = '{\"fields\":[{\"metadata\":{},\"name\":\"AddressID\",\"nullable\":true,\"type\":\"integer\"},{\"metadata\":{},\"name\":\"AddressLine1\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"AddressLine2\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"City\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"StateProvince\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"CountryRegion\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"PostalCode\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"rowguid\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"ModifiedDate\",\"nullable\":true,\"type\":\"string\"}],\"type\":\"struct\"}'\n","\n","schema = StructType.fromJson(json.loads(jschema))"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"bc877dbf-e0c1-4513-b620-743ca55e2f1b","showTitle":true,"title":"Build cloudFiles Config"}},"outputs":[{"data":{"text/html":["\n","
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["cloudfile = {\n"," \"cloudFiles.subscriptionId\":SubscriptionID,\n"," \"cloudFiles.format\":\"csv\",\n"," \"cloudFiles.tenantId\":DirectoryID,\n"," \"cloudFiles.clientId\":ServicePrincipalAppID,\n"," \"cloudFiles.clientSecret\":ServicePrincipalSecret,\n"," \"cloudFiles.resourceGroup\":ResourceGroup,\n"," \"cloudFiles.useNotifications\": \"true\", \n","}"]},{"cell_type":"markdown","metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"33870c3d-ccc8-4d59-9ba2-f5544fae8162","showTitle":false,"title":""}},"source":["##Build Streaming Dataframe"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"2c776687-6e12-4754-912c-eb72a477675f","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["\n","
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["filePath = \"/mnt/source/\"+sourceAdlsFolderName+\"/\"\n","df = (spark\n"," .readStream\n"," .format(\"cloudFiles\")\n"," .schema(schema)\n"," .options(**cloudfile)\n"," .option(\"Header\",True)\n"," .option(\"cloudFiles.schemaLocation\",\"/mnt/source/\"+sourceAdlsFolderName+\"/_schema\") \n"," .load(filePath))"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"19be7af0-05ea-4a40-9c3b-0f4e25818cb5","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["
filecount(1)
/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-27T03:48:44.487.csv450
/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-28T20:48:53.08.csv451
/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-27T01:30:41.2.csv450
/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-27T03:36:43.23.csv451
/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-28T20:42:08.65.csv451
/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-29T00:39:23.16.csv451
/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-28T20:00:32.207.csv450
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"aggData":[],"aggError":"","aggOverflow":false,"aggSchema":[],"aggSeriesLimitReached":false,"aggType":"","arguments":{},"columnCustomDisplayInfos":{},"data":[["/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-27T03:48:44.487.csv",450],["/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-28T20:48:53.08.csv",451],["/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-27T01:30:41.2.csv",450],["/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-27T03:36:43.23.csv",451],["/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-28T20:42:08.65.csv",451],["/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-29T00:39:23.16.csv",451],["/mnt/source/CAAPP/Sales/Microsoft/adworkslt/SalesLT/Address/Address_2021-07-28T20:00:32.207.csv",450]],"datasetInfos":[],"dbfsResultPath":null,"isJsonSchema":true,"metadata":{},"overflow":false,"plotOptions":{"customPlotOptions":{},"displayType":"table","pivotAggregation":null,"pivotColumns":null,"xColumns":null,"yColumns":null},"removedWidgets":[],"schema":[{"metadata":"{}","name":"file","type":"\"string\""},{"metadata":"{}","name":"count(1)","type":"\"long\""}],"type":"table"}},"output_type":"display_data"}],"source":["from pyspark.sql.functions import input_file_name, count\n","filesdf = (df\n"," .withColumn(\"file\",input_file_name())\n"," .groupBy(\"file\")\n"," .agg(count(\"*\"))\n"," )\n","display(filesdf)"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"39d8feef-b048-44ab-aa1c-41ed4e55ca8b","showTitle":true,"title":"upsertToDelta"}},"outputs":[{"data":{"text/html":["\n","
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["from delta.tables import *\n","def upsertToDelta(microBatchOutputDF, batchId):\n"," \n"," deltadf = DeltaTable.forName(spark,\"saleslt.address\")\n"," \n"," (deltadf.alias(\"t\")\n"," .merge(\n"," microBatchOutputDF.alias(\"s\"),\n"," \"s.AddressID = t.AddressID\")\n"," .whenMatchedUpdateAll()\n"," .whenNotMatchedInsertAll()\n"," .execute()\n"," )"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"7c3e872f-ce5f-43e0-8218-e0601e4de264","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["\n","
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["streamQuery = (df.writeStream\n"," .format(\"delta\")\n"," .outputMode(\"append\")\n"," .option(\"mergeSchema\", \"true\") # Added this since it is now required\n"," #.foreachBatch(upsertToDelta) # Comment this out first time you run but remove # for subsequent runs\n"," #.queryName(\"c-changeLoader-merge\") # Comment this out first time you run but remove # for subsequent runs\n"," .trigger(once=True) # Comment this out after the first run so it runs continuously. Remove # on the foreachBatch and queryName lines above\n"," .option(\"checkpointLocation\", \"/mnt/sink/\"+sinkAdlsFolderName+\"/AutoLoader/_checkpoint\")\n"," .start(\"/mnt/sink/\"+sinkAdlsFolderName+\"/AutoLoader/data/\")\n",")"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"9120764a-10e9-42f0-be3f-67f564138e50","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["\n","
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["#dfsql = spark.read.format(\"delta\").load(\"/mnt/sink/\"+sinkAdlsFolderName+\"/AutoLoader/data/\")\n","#dfsql.show()"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"afa09d47-f6c0-446c-88d3-30dd69d3a7b5","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["\n","
Out[16]: DataFrame[]
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"arguments":{},"data":"
Out[16]: DataFrame[]
","datasetInfos":[],"metadata":{},"removedWidgets":[],"type":"html"}},"output_type":"display_data"}],"source":["spark.sql(\"CREATE DATABASE IF NOT EXISTS saleslt\")\n","spark.sql(\"CREATE TABLE IF NOT EXISTS saleslt.address USING DELTA LOCATION '/mnt/sink/\"+sinkAdlsFolderName+\"/AutoLoader/data/'\")"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"e3cb2d8e-5228-4a0f-977b-36241b301276","showTitle":false,"title":""}},"outputs":[{"data":{"text/html":["
AddressIDAddressLine1AddressLine2CityStateProvinceCountryRegionPostalCoderowguidModifiedDate
86748995 Evergreen Wy.nullEverettWashingtonUnited States982016cd4e374-0ef8-4cee-ac60-9dbf1f4e00072007-09-01 00:00:00.0000000
8712502 Evergreen Ste EnullEverettWashingtonUnited States982013ae9003f-f806-41ad-98a5-3c4a86514bc32005-07-01 00:00:00.0000000
897705 SE Mall ParkwaynullEverettWashingtonUnited States9820177a34652-5d9b-40b6-a15a-56d7ee4056f42006-08-01 00:00:00.0000000
1138513833 55th Drive SEnullEverettWAUSA9820829805efe-60fc-4df9-a299-8c1bff87191a2021-07-28 20:29:02.0000000
"]},"metadata":{"application/vnd.databricks.v1+output":{"addedWidgets":{},"aggData":[],"aggError":"","aggOverflow":false,"aggSchema":[],"aggSeriesLimitReached":false,"aggType":"","arguments":{},"columnCustomDisplayInfos":{},"data":[[867,"48995 Evergreen Wy.",null,"Everett","Washington","United States","98201","6cd4e374-0ef8-4cee-ac60-9dbf1f4e0007","2007-09-01 00:00:00.0000000"],[871,"2502 Evergreen Ste E",null,"Everett","Washington","United States","98201","3ae9003f-f806-41ad-98a5-3c4a86514bc3","2005-07-01 00:00:00.0000000"],[897,"705 SE Mall Parkway",null,"Everett","Washington","United States","98201","77a34652-5d9b-40b6-a15a-56d7ee4056f4","2006-08-01 00:00:00.0000000"],[11385,"13833 55th Drive SE",null,"Everett","WA","USA","98208","29805efe-60fc-4df9-a299-8c1bff87191a","2021-07-28 20:29:02.0000000"]],"datasetInfos":[],"dbfsResultPath":null,"isJsonSchema":true,"metadata":{},"overflow":false,"plotOptions":{"customPlotOptions":{},"displayType":"table","pivotAggregation":null,"pivotColumns":null,"xColumns":null,"yColumns":null},"removedWidgets":[],"schema":[{"metadata":"{}","name":"AddressID","type":"\"integer\""},{"metadata":"{}","name":"AddressLine1","type":"\"string\""},{"metadata":"{}","name":"AddressLine2","type":"\"string\""},{"metadata":"{}","name":"City","type":"\"string\""},{"metadata":"{}","name":"StateProvince","type":"\"string\""},{"metadata":"{}","name":"CountryRegion","type":"\"string\""},{"metadata":"{}","name":"PostalCode","type":"\"string\""},{"metadata":"{}","name":"rowguid","type":"\"string\""},{"metadata":"{}","name":"ModifiedDate","type":"\"string\""}],"type":"table"}},"output_type":"display_data"}],"source":["%sql\n","\n","SELECT * FROM saleslt.address where City = 'Everett'"]},{"cell_type":"code","execution_count":null,"metadata":{"application/vnd.databricks.v1+cell":{"inputWidgets":{},"nuid":"20aae39e-8a8a-4bb7-a7dd-2e2f8fda5378","showTitle":false,"title":""}},"outputs":[],"source":[]}],"metadata":{"application/vnd.databricks.v1+notebook":{"dashboards":[],"language":"python","notebookMetadata":{"pythonIndentUnit":2},"notebookName":"autoloadersp","notebookOrigID":3796702961398816,"widgets":{}},"language_info":{"name":"python"}},"nbformat":4,"nbformat_minor":0} diff --git a/SamplesV2/ChangeDataCapture/usecases/cdc/steps/usecasesteps.md b/SamplesV2/ChangeDataCapture/usecases/cdc/steps/usecasesteps.md old mode 100644 new mode 100755 index a1282c60..0ee80092 --- a/SamplesV2/ChangeDataCapture/usecases/cdc/steps/usecasesteps.md +++ b/SamplesV2/ChangeDataCapture/usecases/cdc/steps/usecasesteps.md @@ -1,420 +1,412 @@ -# Change Data Capture Steps - -Please follow these steps to configure the sample - -## Step 1 Create Control Table and Stored Procedure used by Azure Data Factory - -You can create the tables by connecting to the Azure SQL Database deployed by the ARM template using a tool like [Azure Data Studio](https://docs.microsoft.com/en-us/sql/azure-data-studio/download-azure-data-studio?view=sql-server-ver15) - -Use the following SQL script [ControlTableForSourceToSink.sql](https://github.com/Azure/Azure-DataFactory/blob/main/SamplesV2/ChangeDataCapture/usecases/cdc/code/sqlscripts/ControlTableForSourceToSink.sql) to create the ControlTableForSourceToSink table in the database deployed by the ARM template. - -![Step 1 table](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/cdcstep1table.png) - -Use the following SQL script [spUpdateWatermark.sql](https://github.com/Azure/Azure-DataFactory/blob/main/SamplesV2/ChangeDataCapture/usecases/cdc/code/sqlscripts/spUpdateWatermark.sql) to create the spUpdateWatermark stored procedure in the database deployed by the ARM template. - -![Step 1 sproc](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/cdcstep1sproc.png) - -Use the following SQL script [CreateStudent.sql](https://github.com/Azure/Azure-DataFactory/blob/main/SamplesV2/ChangeDataCapture/usecases/cdc/code/sqlscripts/CreateStudent.sql) to create the studentMath table in the database deployed by the ARM template. - -![Step 1 student](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/cdcstep1student.png) - -## Step 2 Create Azure Data Factory Pipeline from local Template - -Download [ADF Template zip](https://github.com/Azure/Azure-DataFactory/tree/main/SamplesV2/ChangeDataCapture/usecases/cdc/code/adfTemplates) or find it in your cloned GitHub Repo. - -![adftemplatezip](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adftemplatezip.png) - -Open up the ADF deployed by the ARM template. Select Pipeline > Import from pipeline template - -![adfplfromtemplate](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfplfromtemplate.png) - -Click on the zip file and click Open - -![adfOpenLocalTemplate](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfOpenLocalTemplate.png) - -It should look like this - -![adftemplateUserinputs](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adftemplateUserinputs.png) - -Select +New in the first User input and create an Azure SQL Database Linked Service to the database deployed by the ARM template. - -![adfDatabaseLinkedService](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfDatabaseLinkedService.png) - -Select +New in the second User input and create an Azure Data Lake Storage Gen2 Linked Service - -![adfAdlsLinkedService](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfAdlsLinkedService.png) - -For Input 3 select the same Database you chose in Input 1 - -For Input 4 select the same Database you chose in Input 1 - -For Input 5 select the same Storage input you chose in Input 2 - -For Input 6 select the same Database you chose in Input 1 - -For Input 7 select the same Database you chose in Input 1 - -Then click on Use this template - -![adfAllUserinputs.png](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfAllUserinputs.png) - -It should look like this when it is imported - -![adfTemplateImported](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfTemplateImported.png) - -## Step 3 Debug the DeltaCopyAndFullCopyfromDB_using_ControlTable Pipeline - -Click on Debug, enter the name of the Control table `ControlTableForSourceToSink` -Click OK - -![adfDebugPipelineRun](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfDebugPipelineRun.png) - -Once the pipeline runs successfully it should look like this - -![adfSuccessfulRun](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfSuccessfulRun.png) - -Check that the files have been created in Storage using [Azure Storage Explorer](https://azure.microsoft.com/en-us/features/storage-explorer/) or the Azure Portal in the browser. The files should be in bronze container at a path like `CDC/Sales/Microsoft/AdventureWorksLT/SalesLT/Address/` - -![adfFileInStorage](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfFileInStorage.png) - -If you run the pipeline a second time you will see another file created. Since the Address table has a ModifiedDate column which can be used as a Watermark the second file (smaller 102 bytes) only contains a header since there were no changes (unless some updates are done to the Address table) - -![adfFileInStorage2](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfFileInStorage2.png) - -If you compare this file to the studentMath files (which does not have a watermark column) they are the same size because it in not doing a delta. The file will get larger as inserts and update happen in the studentMath table. - -![adfFileInStorage3](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfFileInStorage3.png) - - -You can now save the pipeline by clicking on Publish all - -![adfPublishAll](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfPublishAll.png) - -## Step 4 Import, configure, and run the Databricks notebook - -### Requirements - -- Databricks Runtime 8.3 or above when you create your cluster - -- Setup Permissions to ADLS Gen2 - -- Secrets in Key vault - -*Steps* - -### Import the Databricks notebook - -Open up you Databricks workspace and navigate to your user, select the dropdown and select import - -![adbworkspace](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbworkspace.png) - -Import from file if you cloned the repo locally or enter the URL `https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/usecases/cdc/code/notebooks/autoloadersp.ipynb` to the Notebook in GitHub Repo [autoloadersp.ipynb](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/usecases/cdc/code/notebooks/autoloadersp.ipynb) and click Import - -![adbnotebookimport](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbnotebookimport.png) - -You should now have a notebook that looks like this: - -![adbnotebook](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbnotebook.png) - -Change the value of the adlsAccountName = "" in cell one to the ADLS AccountName of in your deployment - -In my chase my deployment has a Storage account name of `adfacceler7kdgtkhj5mpoa` so the first row of the cell would read: - -``` -adlsAccountName = "adfacceler7kdgtkhj5mpoa" -``` - -![adbrgservices](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbrgservices.png) - -Note if you change any column values in the `ControlTableForSourceToSink` table make the appropriate changes. - -![adbfolderpath](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbfolderpath.png) - -The notebook would now look like this: - -``` -sourceAdlsFolderName = CDC/Sales/Microsoft/AdventureWorksLT/SalesLT/Address -sinkAdlsFolderName = CDC/Sales/Microsoft/AdventureWorksLT/SalesLT/Address -``` - -![adbadlsacctname](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbadlsacctname.png) - -### Configure Service Principal and Permissions - -*Create a Service principal* [Reference](https://docs.microsoft.com/en-us/azure/active-directory/develop/howto-create-service-principal-portal#register-an-application-with-azure-ad-and-create-a-service-principal) - -Create an [Azure Active Directory app and service principal](https://docs.microsoft.com/en-us/azure/active-directory/develop/howto-create-service-principal-portal) in the form of client ID and client secret. - -1. Sign in to your Azure Account through the Azure portal. - -2. Select Azure Active Directory. - -3. Select App registrations. - -![adbappreg](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbappreg.png) - -4. Select New registration. - -Name the application something like `autoloader-darsch`. Select a supported account type, which determines who can use the application. After setting the values, select Register. - -Note that it is a good idea to name the application with something unique to you like your email alias (darsch in my case) because other might use similar names like autoloader. - -![adbregister](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbregister.png) - -5. Copy the Directory (tenant) ID and store it to use to create an application secret. - -6. Copy the Application (client) ID and store it to use to create an application secret. - -![adbappids](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbappids.png) - -*Assign Role Permissions* [Reference](https://docs.microsoft.com/en-us/azure/databricks/spark/latest/structured-streaming/auto-loader-gen2#permissions) - -7. At storage account level assign this app the following roles to the storage account in which the input path resides: - - `Contributor`: This role is for setting up resources in your storage account, such as queues and event subscriptions. - - `Storage Queue Data Contributor`: This role is for performing queue operations such as retrieving and deleting messages from the queues. This role is required in Databricks Runtime 8.1 and above only when you provide a service principal without a connection string. - - `Storage Blob Data Contributor` to access storage - -![adbstorageiam](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbstorageiam.png) - -8. At resource group level assign this app the following role to the related resource group: - - `EventGrid EventSubscription Contributor`: This role is for performing event grid subscription operations such as creating or listing event subscriptions. - -![adbrgiam](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbrgiam.png) - -*Create a new application secret* - -- Select Azure Active Directory. - -- From App registrations in Azure AD, select your application. - -- Select Certificates & secrets. - -- Select Client secrets -> New client secret. - -- Provide a description `AppSecret` of the secret, and a duration. When done, select Add. - -![adbappsecret](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbappsecret.png) - -After saving the client secret, the value of the client secret is displayed. Copy this value because you won't be able to retrieve the key later. You will provide the key value with the application ID to sign in as the application. Store the key value where your application can retrieve it. - -![adbappsecretval](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbappsecretval.png) - -### Deploy a Key Vault and setup secrets - -Create a Key Vault in the Resource group by clicking Create - -![adbrgservices](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbrgservices.png) - -Search for `Key vault` - -![adbkvsearch](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbkvsearch.png) - -Click Create - -![adbkvcreate](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbkvcreate.png) - -Create the Key Vault in the same Resource group and Region as you other resource deployed. Click Review and Create and then click Create - -![adbrevcreate](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbrevcreate.png) - -You should now have a Key vault in your resources - -![adbrgwithkv](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbrgwithkv.png) - -Open up you Key vault and add the appsecret created above - -Choose Secrets and click Generate/Import - -![adbkvsecretgen](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbkvsecretgen.png) - -Enter you secret Name and paste in the app secret you created earlier, set activation date and click Create - -![adbcreatesecret](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbcreatesecret.png) - -It should look like this: - -![adbfirstsecret](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbfirstsecret.png) - -*Create the rest of the secrets you need for the notebook* - -Create the rest of the secrets in cell 4 of the notebook. The secret names are at the end of each line after EnterDatabrickSecretScopeHere - -``` -SubscriptionID = dbutils.secrets.get("","SubscriptionID") -DirectoryID = dbutils.secrets.get("","DirectoryID") -ServicePrincipalAppID = dbutils.secrets.get("","ServicePrincipalAppID") -ServicePrincipalSecret = dbutils.secrets.get("","AppSecret") -ResourceGroup = dbutils.secrets.get("","ResourceGroup") -BlobConnectionKey = dbutils.secrets.get("","Adls2-KeySecret") -``` - -``` -#Secret Names# - -SubscriptionID - -DirectoryID - -ServicePrincipalAppID - -AppSecret (already created above) - -ResourceGroup - -Adls2-KeySecret -``` - -The Adls2-KeySecret is created using the storage account key - -![secrets](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/secrets.png) - - -**Create an Azure Key Vault-backed secret scope using the UI** [Reference](https://docs.microsoft.com/en-us/azure/databricks/security/secrets/secret-scopes#create-an-azure-key-vault-backed-secret-scope-using-the-ui) - -Verify that you have Contributor permission on the Azure Key Vault instance that you want to use to back the secret scope. - -Go to https://#secrets/createScope. This URL is case sensitive; The "S" in scope in createScope must be uppercase. - -https://#secrets/createScope - -In my case `https://adb-3272096941209353.13.azuredatabricks.net#secrets/createScope` - -You can find the databricks-instance in the URL of your workspace - -![adbinstance](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbinstance.png) - -Enter Scope Name: I choose something like `demo-autoloader` which is what I used in the notebook - -Manage Principal: `All Users` - -DNS Name: `https://xxxxxx.vault.azure.net/` Find in the properties of Key vault under Vault URI - -Resource ID: Find in the properties of the Key vault. Looks something like this: - -``` -/subscriptions/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/resourcegroups/databricks-rg/providers/Microsoft.KeyVault/vaults/databricksKV -``` -![adbsecretResID](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbsecretResID.png) - -Click Create - -![adbsecretscope](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbsecretscope.png) - - -### Create a Databricks Cluster and attach to notebook - -Create a cluster using the Runtime 8.3 or above - -Enter Cluster Name, Runtime Version, Set Terminate after, Min Workers, Max Workers and click Create Cluster - -![adbcreatecluster](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbcreatecluster.png) - -### Add the Scopes into Cells 2 and 4 - -Change the value of "" in cell 2 and 4 to the Scope name you created earlier. - -In my chase `demo-autoloader` so the 2 cell would read: - -``` -spark.conf.set( - "fs.azure.account.key." + adlsAccountName + ".dfs.core.windows.net", - dbutils.secrets.get(scope="demo-autoloader",key="Adls2-KeySecret")) -``` - -The 4 cell would read: - -``` -SubscriptionID = dbutils.secrets.get("demo-autoloader","SubscriptionID") -DirectoryID = dbutils.secrets.get("demo-autoloader","DirectoryID") -ServicePrincipalAppID = dbutils.secrets.get("demo-autoloader","ServicePrincipalAppID") -ServicePrincipalSecret = dbutils.secrets.get("demo-autoloader","AppSecret") -ResourceGroup = dbutils.secrets.get("demo-autoloader","ResourceGroup") -BlobConnectionKey = dbutils.secrets.get("demo-autoloader","Adls2-KeySecret") -``` - -### Run the notebook one cell at a time (at least the first time) - -Once the cluster is started you will be able to run the code in the cells - -Click on Run Cell - -![adbcruncell](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbcruncell.png) - -Do this for the next cell down etc. - -You can skip cell 6 the first time because nothing has been mounted. You may get an error like this: - -![adbunmount](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbunmount.png) - -When running the Notebook the first time, just move on to cell 7 to mount the Source and Sink file system - -The first time to run cell 16 comment out the 2 lines it references by putting # at beginning of each line. - -``` - #.foreachBatch(upsertToDelta) # Comment this out first time you run - #.queryName("c-changeLoader-merge") # Comment this out first time you run -``` - -![adbcell16](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbcell16.png) - -After you run this the first time uncomment the 2 lines because you will want the upsert to run. If you cancel the streaming and then run the cell again it will continue to stream as long as trigger once is commented out. - -``` -#.trigger(once=True) -``` - -Also notice that running the notebook has created a `Event Grid System Topic` in the resources - -![adbeventgrid](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbeventgrid.png) - -When you run the last cell 19 you should see 3 records for Everett WA - -![adbcell19](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbcell19.png) - -### Go make a change to the Address table in Azure SQL Database, Run ADF Pipeline, and rerun cell 16 and 19 - -In a SQL Editor like Azure Data Studio or the browser run the following SQL to insert a new row - -``` -INSERT INTO [SalesLT].[Address] - ([AddressLine1], [AddressLine2], [City], [StateProvince] ,[CountryRegion], [PostalCode]) - VALUES - ('138th Drive', NULL, 'Everett', 'WA', 'USA', '98208') -``` - -You can cut and paste or use the following SQL script [InsertAddress.sql](https://github.com/Azure/Azure-DataFactory/blob/main/SamplesV2/ChangeDataCapture/usecases/cdc/code/sqlscripts/InsertAddress.sql) - -Rerun the ADF Pipeline - -Click on Debug, enter the name of the Control table `ControlTableForSourceToSink` Click OK - -![adfDebugPipelineRun](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfDebugPipelineRun.png) - -Rerun cell 16 (if not already streaming) and 19 in the `autoloadersp` notebook - -Make sure rows 4 and 5 of the code are uncommented so that the upsertToDelta function runs for updates - -![adbcell16again](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbcell16again.png) - -This time you should see that new record - -![adbcell19again](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbcell19again.png) - -If you want to try an insert and update try these. Remember that you need to run the ADF pipeline after the insert, check that the insert happens in the Delta table, and then ADF pipeline again after the update. Note you might have to change the AddressID = 11383 to a different AddressID if records got inserted in a different order. - -``` -INSERT INTO [SalesLT].[Address] - ([AddressLine1], [AddressLine2], [City], [StateProvince] ,[CountryRegion], [PostalCode]) - VALUES - ('2nd Demo Drive', NULL, 'Everett', 'WA', 'USA', '98208') -``` - -``` -Update [SalesLT].[Address] set AddressLine1 = 'Second Demo Drive', ModifiedDate = getdate() where AddressID = 11383 -``` - +# Change Data Capture Steps + +Please follow these steps to configure the sample + +## Step 1 Create Control Table and Stored Procedure used by Azure Data Factory + +You can create the tables by connecting to the Azure SQL Database deployed by the ARM template using a tool like [Azure Data Studio](https://docs.microsoft.com/en-us/sql/azure-data-studio/download-azure-data-studio?view=sql-server-ver15) + +Use the following SQL script [ControlTableForSourceToSink.sql](https://github.com/Azure/Azure-DataFactory/blob/main/SamplesV2/ChangeDataCapture/usecases/cdc/code/sqlscripts/ControlTableForSourceToSink.sql) to create the ControlTableForSourceToSink table in the database deployed by the ARM template. + +![Step 1 table](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/cdcstep1table.png) + +Use the following SQL script [spUpdateWatermark.sql](https://github.com/Azure/Azure-DataFactory/blob/main/SamplesV2/ChangeDataCapture/usecases/cdc/code/sqlscripts/spUpdateWatermark.sql) to create the spUpdateWatermark stored procedure in the database deployed by the ARM template. + +![Step 1 sproc](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/cdcstep1sproc.png) + +Use the following SQL script [CreateStudent.sql](https://github.com/Azure/Azure-DataFactory/blob/main/SamplesV2/ChangeDataCapture/usecases/cdc/code/sqlscripts/CreateStudent.sql) to create the studentMath table in the database deployed by the ARM template. + +![Step 1 student](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/cdcstep1student.png) + +## Step 2 Create Azure Data Factory Pipeline from local Template + +Download [ADF Template zip](https://github.com/Azure/Azure-DataFactory/tree/main/SamplesV2/ChangeDataCapture/usecases/cdc/code/adfTemplates) or find it in your cloned GitHub Repo. + +![adftemplatezip](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adftemplatezip.png) + +Open up the ADF deployed by the ARM template. Select Pipeline > Import from pipeline template + +![adfplfromtemplate](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfplfromtemplate.png) + +Click on the zip file and click Open + +![adfOpenLocalTemplate](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfOpenLocalTemplate.png) + +It should look like this + +![adftemplateUserinputs](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adftemplateUserinputs.png) + +Select +New in the first User input and create an Azure SQL Database Linked Service to the database deployed by the ARM template. + +![adfDatabaseLinkedService](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfDatabaseLinkedService.png) + +For User input 2 select the same Database you chose in User input 1 + +Select +New in the third User input and create an Azure Data Lake Storage Gen2 Linked Service + +![adfAdlsLinkedService](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfAdlsLinkedService.png) + +Then click on Use this template + +![adfAllUserinputs.png](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfAllUserinputs.png) + +It should look like this when it is imported + +![adfTemplateImported](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfTemplateImported.png) + +## Step 3 Debug the DeltaCopyAndFullCopyfromDB_using_ControlTable Pipeline + +Click on Debug, enter the name of the Control table `ControlTableForSourceToSink` +Click OK + +![adfDebugPipelineRun](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfDebugPipelineRun.png) + +Once the pipeline runs successfully it should look like this + +![adfSuccessfulRun](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfSuccessfulRun.png) + +Check that the files have been created in Storage using [Azure Storage Explorer](https://azure.microsoft.com/en-us/features/storage-explorer/) or the Azure Portal in the browser. The files should be in bronze container at a path like `CDC/Sales/Microsoft/AdventureWorksLT/SalesLT/Address/` + +![adfFileInStorage](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfFileInStorage.png) + +If you run the pipeline a second time you will see another file created. Since the Address table has a ModifiedDate column which can be used as a Watermark the second file (smaller 102 bytes) only contains a header since there were no changes (unless some updates are done to the Address table) + +![adfFileInStorage2](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfFileInStorage2.png) + +If you compare this file to the studentMath files (which does not have a watermark column) they are the same size because it in not doing a delta. The file will get larger as inserts and update happen in the studentMath table. + +![adfFileInStorage3](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfFileInStorage3.png) + + +You can now save the pipeline by clicking on Publish all + +![adfPublishAll](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfPublishAll.png) + +## Step 4 Import, configure, and run the Databricks notebook + +### Requirements + +- Databricks Runtime 8.3 or above when you create your cluster + +- Setup Permissions to ADLS Gen2 + +- Secrets in Key vault + +*Steps* + +### Import the Databricks notebook + +Open up you Databricks workspace and navigate to your user, select the dropdown and select import + +![adbworkspace](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbworkspace.png) + +Import from file if you cloned the repo locally or enter the URL `https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/usecases/cdc/code/notebooks/autoloadersp.ipynb` to the Notebook in GitHub Repo [autoloadersp.ipynb](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/usecases/cdc/code/notebooks/autoloadersp.ipynb) and click Import + +![adbnotebookimport](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbnotebookimport.png) + +You should now have a notebook that looks like this: + +![adbnotebook](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbnotebook.png) + +Change the value of the adlsAccountName = "" in cell one to the ADLS AccountName of in your deployment + +In my chase my deployment has a Storage account name of `adfacceler7kdgtkhj5mpoa` so the first row of the cell would read: + +``` +adlsAccountName = "adfacceler7kdgtkhj5mpoa" +``` + +![adbrgservices](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbrgservices.png) + +Note if you change any column values in the `ControlTableForSourceToSink` table make the appropriate changes. + +![adbfolderpath](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbfolderpath.png) + +The notebook would now look like this: + +``` +sourceAdlsFolderName = CDC/Sales/Microsoft/AdventureWorksLT/SalesLT/Address +sinkAdlsFolderName = CDC/Sales/Microsoft/AdventureWorksLT/SalesLT/Address +``` + +![adbadlsacctname](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbadlsacctname.png) + +### Configure Service Principal and Permissions + +*Create a Service principal* [Reference](https://docs.microsoft.com/en-us/azure/active-directory/develop/howto-create-service-principal-portal#register-an-application-with-azure-ad-and-create-a-service-principal) + +Create an [Azure Active Directory app and service principal](https://docs.microsoft.com/en-us/azure/active-directory/develop/howto-create-service-principal-portal) in the form of client ID and client secret. + +1. Sign in to your Azure Account through the Azure portal. + +2. Select Azure Active Directory. + +3. Select App registrations. + +![adbappreg](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbappreg.png) + +4. Select New registration. + +Name the application something like `autoloader-darsch`. Select a supported account type, which determines who can use the application. After setting the values, select Register. + +Note that it is a good idea to name the application with something unique to you like your email alias (darsch in my case) because other might use similar names like autoloader. + +![adbregister](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbregister.png) + +5. Copy the Directory (tenant) ID and store it to use to create an application secret. + +6. Copy the Application (client) ID and store it to use to create an application secret. + +![adbappids](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbappids.png) + +*Assign Role Permissions* [Reference](https://docs.microsoft.com/en-us/azure/databricks/spark/latest/structured-streaming/auto-loader-gen2#permissions) + +7. At storage account level assign this app the following roles to the storage account in which the input path resides: + + `Contributor`: This role is for setting up resources in your storage account, such as queues and event subscriptions. + + `Storage Queue Data Contributor`: This role is for performing queue operations such as retrieving and deleting messages from the queues. This role is required in Databricks Runtime 8.1 and above only when you provide a service principal without a connection string. + + `Storage Blob Data Contributor` to access storage + +![adbstorageiam](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbstorageiam.png) + +8. At resource group level assign this app the following role to the related resource group: + + `EventGrid EventSubscription Contributor`: This role is for performing event grid subscription operations such as creating or listing event subscriptions. + +![adbrgiam](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbrgiam.png) + +*Create a new application secret* + +- Select Azure Active Directory. + +- From App registrations in Azure AD, select your application. + +- Select Certificates & secrets. + +- Select Client secrets -> New client secret. + +- Provide a description `AppSecret` of the secret, and a duration. When done, select Add. + +![adbappsecret](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbappsecret.png) + +After saving the client secret, the value of the client secret is displayed. Copy this value because you won't be able to retrieve the key later. You will provide the key value with the application ID to sign in as the application. Store the key value where your application can retrieve it. + +![adbappsecretval](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbappsecretval.png) + +### Deploy a Key Vault and setup secrets + +Create a Key Vault in the Resource group by clicking Create + +![adbrgservices](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbrgservices.png) + +Search for `Key vault` + +![adbkvsearch](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbkvsearch.png) + +Click Create + +![adbkvcreate](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbkvcreate.png) + +Create the Key Vault in the same Resource group and Region as you other resource deployed. Click Review and Create and then click Create + +![adbrevcreate](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbrevcreate.png) + +You should now have a Key vault in your resources + +![adbrgwithkv](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbrgwithkv.png) + +Open up you Key vault and add the appsecret created above + +Choose Secrets and click Generate/Import + +![adbkvsecretgen](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbkvsecretgen.png) + +Enter you secret Name and paste in the app secret you created earlier, set activation date and click Create + +![adbcreatesecret](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbcreatesecret.png) + +It should look like this: + +![adbfirstsecret](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbfirstsecret.png) + +*Create the rest of the secrets you need for the notebook* + +Create the rest of the secrets in cell 4 of the notebook. The secret names are at the end of each line after EnterDatabrickSecretScopeHere + +``` +SubscriptionID = dbutils.secrets.get("","SubscriptionID") +DirectoryID = dbutils.secrets.get("","DirectoryID") +ServicePrincipalAppID = dbutils.secrets.get("","ServicePrincipalAppID") +ServicePrincipalSecret = dbutils.secrets.get("","AppSecret") +ResourceGroup = dbutils.secrets.get("","ResourceGroup") +BlobConnectionKey = dbutils.secrets.get("","Adls2-KeySecret") +``` + +``` +#Secret Names# + +SubscriptionID + +DirectoryID + +ServicePrincipalAppID + +AppSecret (already created above) + +ResourceGroup + +Adls2-KeySecret +``` + +The Adls2-KeySecret is created using the storage account key + +![secrets](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/secrets.png) + + +**Create an Azure Key Vault-backed secret scope using the UI** [Reference](https://docs.microsoft.com/en-us/azure/databricks/security/secrets/secret-scopes#create-an-azure-key-vault-backed-secret-scope-using-the-ui) + +Verify that you have Contributor permission on the Azure Key Vault instance that you want to use to back the secret scope. + +Go to https://#secrets/createScope. This URL is case sensitive; The "S" in scope in createScope must be uppercase. + +https://#secrets/createScope + +In my case `https://adb-3272096941209353.13.azuredatabricks.net#secrets/createScope` + +You can find the databricks-instance in the URL of your workspace + +![adbinstance](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbinstance.png) + +Enter Scope Name: I choose something like `demo-autoloader` which is what I used in the notebook + +Manage Principal: `All Users` + +DNS Name: `https://xxxxxx.vault.azure.net/` Find in the properties of Key vault under Vault URI + +Resource ID: Find in the properties of the Key vault. Looks something like this: + +``` +/subscriptions/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/resourcegroups/databricks-rg/providers/Microsoft.KeyVault/vaults/databricksKV +``` +![adbsecretResID](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbsecretResID.png) + +Click Create + +![adbsecretscope](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbsecretscope.png) + + +### Create a Databricks Cluster and attach to notebook + +Create a cluster using the Runtime 8.3 or above + +Enter Cluster Name, Runtime Version, Set Terminate after, Min Workers, Max Workers and click Create Cluster + +![adbcreatecluster](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbcreatecluster.png) + +### Add the Scopes into Cells 2 and 4 + +Change the value of "" in cell 2 and 4 to the Scope name you created earlier. + +In my chase `demo-autoloader` so the 2 cell would read: + +``` +spark.conf.set( + "fs.azure.account.key." + adlsAccountName + ".dfs.core.windows.net", + dbutils.secrets.get(scope="demo-autoloader",key="Adls2-KeySecret")) +``` + +The 4 cell would read: + +``` +SubscriptionID = dbutils.secrets.get("demo-autoloader","SubscriptionID") +DirectoryID = dbutils.secrets.get("demo-autoloader","DirectoryID") +ServicePrincipalAppID = dbutils.secrets.get("demo-autoloader","ServicePrincipalAppID") +ServicePrincipalSecret = dbutils.secrets.get("demo-autoloader","AppSecret") +ResourceGroup = dbutils.secrets.get("demo-autoloader","ResourceGroup") +BlobConnectionKey = dbutils.secrets.get("demo-autoloader","Adls2-KeySecret") +``` + +### Run the notebook one cell at a time (at least the first time) + +Once the cluster is started you will be able to run the code in the cells + +Click on Run Cell + +![adbcruncell](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbcruncell.png) + +Do this for the next cell down etc. + +You can skip cell 6 the first time because nothing has been mounted. You may get an error like this: + +![adbunmount](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbunmount.png) + +When running the Notebook the first time, just move on to cell 7 to mount the Source and Sink file system + +The first time to run cell 16 leave the # comments on the 2 lines referenced below + +``` + #.foreachBatch(upsertToDelta) # Comment this out first time you run + #.queryName("c-changeLoader-merge") # Comment this out first time you run +``` + +![adbcell16](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbcell16.png) + +After you run this the first time uncomment the 2 lines because you will want the upsert to run. If you cancel the streaming and then run the cell again it will continue to stream as long as trigger once is commented out. + +``` +#.trigger(once=True) +``` + +Also notice that running the notebook has created a `Event Grid System Topic` in the resources + +![adbeventgrid](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbeventgrid.png) + +When you run the last cell 19 you should see 3 records for Everett WA + +![adbcell19](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbcell19.png) + +### Go make a change to the Address table in Azure SQL Database, Run ADF Pipeline, and rerun cell 16 and 19 + +In a SQL Editor like Azure Data Studio or the browser run the following SQL to insert a new row + +``` +INSERT INTO [SalesLT].[Address] + ([AddressLine1], [AddressLine2], [City], [StateProvince] ,[CountryRegion], [PostalCode]) + VALUES + ('138th Drive', NULL, 'Everett', 'WA', 'USA', '98208') +``` + +You can cut and paste or use the following SQL script [InsertAddress.sql](https://github.com/Azure/Azure-DataFactory/blob/main/SamplesV2/ChangeDataCapture/usecases/cdc/code/sqlscripts/InsertAddress.sql) + +Rerun the ADF Pipeline + +Click on Debug, enter the name of the Control table `ControlTableForSourceToSink` Click OK + +![adfDebugPipelineRun](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adfDebugPipelineRun.png) + +Rerun cell 16 (if not already streaming) and 19 in the `autoloadersp` notebook + +Make sure rows 4 and 5 of the code are uncommented so that the upsertToDelta function runs for updates + +![adbcell16again](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbcell16again.png) + +This time you should see that new record + +![adbcell19again](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbcell19again.png) + +If you want to try an insert and update try these. `Remember that you need to run the ADF pipeline after the insert, check that the insert happens in the Delta table, and then ADF pipeline again after the update. Note you might have to change the AddressID = 11383 to a different AddressID if records got inserted in a different order.` + +``` +INSERT INTO [SalesLT].[Address] + ([AddressLine1], [AddressLine2], [City], [StateProvince] ,[CountryRegion], [PostalCode]) + VALUES + ('2nd Demo Drive', NULL, 'Everett', 'WA', 'USA', '98208') +``` + +``` +Update [SalesLT].[Address] set AddressLine1 = 'Second Demo Drive', ModifiedDate = getdate() where AddressID = 11383 +``` + ![adbcell19yetagain](https://raw.githubusercontent.com/Azure/Azure-DataFactory/main/SamplesV2/ChangeDataCapture/images/adbcell19yetagain.png) \ No newline at end of file