diff --git a/docs/assets/img/unity_export_hook_result_log.png b/docs/assets/img/unity_export_hook_result_log.png new file mode 100644 index 00000000000..c954195ffc2 Binary files /dev/null and b/docs/assets/img/unity_export_hook_result_log.png differ diff --git a/docs/assets/img/unity_exported_table_columns.png b/docs/assets/img/unity_exported_table_columns.png new file mode 100644 index 00000000000..c31356695c0 Binary files /dev/null and b/docs/assets/img/unity_exported_table_columns.png differ diff --git a/docs/howto/catalog_exports.md b/docs/howto/catalog_exports.md index 8bc3fa0d4a6..c0006590a7b 100644 --- a/docs/howto/catalog_exports.md +++ b/docs/howto/catalog_exports.md @@ -76,9 +76,12 @@ Exporters are code packages accessible through [Lua integration]({% link howto/h #### Currently supported exporters -- Symlink Exporter: Writes metadata for the table using Hive's [SymlinkTextInputFormat](https://svn.apache.org/repos/infra/websites/production/hive/content/javadocs/r2.1.1/api/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.html) -- AWS Glue Catalog (+ Athena) Exporter: Creates a table in Glue using Hive's format and updates the location to symlink files (reuses Symlink Exporter). -- See a step by step guide on how to integrate with [Glue Exporter]({% link integrations/glue_metastore.md %}) +| Exporter | Description | Notes | +|:-----------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **Symlink exporter** | Writes metadata for the table using Hive's [SymlinkTextInputFormat](https://svn.apache.org/repos/infra/websites/production/hive/content/javadocs/r2.1.1/api/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.html) | | +| **AWS Glue Catalog (+ Athena) exporter** | Creates a table in Glue using Hive's format and updates the location to symlink files (reuses Symlink Exporter). | See a step-by-step guide on how to integrate with [Glue Exporter]({% link integrations/glue_metastore.md %}) | +| **Delta Lake table exporter** | Export Delta Lake tables from lakeFS to an external storage | | +| **Unity Catalog exporter** | The Unity Catalog exporter serves the purpose of registering a Delta Lake table in Unity Catalog. It operates in conjunction with the Delta Lake exporter. In this workflow, the Delta Lake exporter is utilized to export a Delta Lake table from lakeFS. Subsequently, the obtained result is passed to the Unity Catalog exporter to facilitate its registration within Unity Catalog. | See a step-by-step guide on how to integrate with [Unity Catalog Exporter]({% link integrations/unity_catalog.md %})
Currently, only AWS S3 storage is supported | #### Running an Exporter @@ -139,4 +142,4 @@ sequenceDiagram Exporter->>Catalog: register object store location Query Engine-->Catalog: Query Query Engine-->Object Store: Query -``` \ No newline at end of file +``` diff --git a/docs/howto/hooks/lua.md b/docs/howto/hooks/lua.md index a79f1784795..03a175c0283 100644 --- a/docs/howto/hooks/lua.md +++ b/docs/howto/hooks/lua.md @@ -220,6 +220,57 @@ Returns the MD5 digest (string) of the given data Returns the SHA256 digest (string) of the given data +### `databricks/client(databricks_host, databricks_service_principal_token)` + +Returns a table representing a Databricks client with the `register_external_table` and `create_or_get_schema` methods. + +### `databricks/client.create_schema(schema_name, catalog_name, get_if_exists)` + +Creates a schema, or retrieves it if exists, in the configured Databricks host's Unity catalog. +If a schema doesn't exist, a new schema with the given `schema_name` will be created under the given `catalog_name`. +Returns the created/fetched schema name. + +Parameters: + +- `schema_name(string)`: The required schema name +- `catalog_name(string)`: The catalog name under which the schema will be created (or from which it will be fetched) +- `get_if_exists(boolean)`: In case of failure due to an existing schema with the given `schema_name` in the given +`catalog_name`, return the schema. + +Example: + +```lua +local databricks = require("databricks") +local client = databricks.client("https://my-host.cloud.databricks.com", "my-service-principal-token") +local schema_name = client.create_schema("main", "mycatalog", true) +``` + +### `databricks/client.register_external_table(table_name, physical_path, warehouse_id, catalog_name, schema_name)` + +Registers an external table under the provided warehouse ID, catalog name, and schema name. +In order for this method call to succeed, an external location should be configured in the catalog, with the +`physical_path`'s root storage URI (for example: `s3://mybucket`). +Returns the table's creation status. + +Parameters: + +- `table_name(string)`: Table name. +- `physical_path(string)`: A location to which the external table will refer, e.g. `s3://mybucket/the/path/to/mytable`. +- `warehouse_id(string)`: The SQL warehouse ID used in Databricks to run the `CREATE TABLE` query (fetched from the SQL warehouse +`Connection Details`, or by running `databricks warehouses get`, choosing your SQL warehouse and fetching its ID). +- `catalog_name(string)`: The name of the catalog under which a schema will be created (or fetched from). +- `schema_name(string)`: The name of the schema under which the table will be created. + +Example: + +```lua +local databricks = require("databricks") +local client = databricks.client("https://my-host.cloud.databricks.com", "my-service-principal-token") +local status = client.register_external_table("mytable", "s3://mybucket/the/path/to/mytable", "examwarehouseple", "my-catalog-name", "myschema") +``` + +- For the Databricks permissions needed to run this method, check out the [Unity Catalog Exporter]({% link integrations/unity_catalog.md %}) docs. + ### `encoding/base64/encode(data)` Encodes the given data to a base64 string @@ -348,6 +399,76 @@ Returns an object-wise diff of uncommitted changes on `branch_id`. Returns a stat object for the given path under the given reference and repository. +### `lakefs/catalogexport/glue_exporter.get_full_table_name(descriptor, action_info)` + +Generate glue table name. + +Parameters: + +- `descriptor(Table)`: Object from (e.g. _lakefs_tables/my_table.yaml). +- `action_info(Table)`: The global action object. + +### `lakefs/catalogexport/delta_exporter` + +A package used to export Delta Lake tables from lakeFS to an external cloud storage. + +### `lakefs/catalogexport/delta_exporter.export_delta_log(action, table_names, writer, delta_client, table_descriptors_path)` + +The function used to export Delta Lake tables. +The return value is a table with mapping of table names to external table location (from which it is possible to query the data). + +Parameters: + +- `action`: The global action object +- `table_names`: Delta tables name list (e.g. `{"table1", "table2"}`) +- `writer`: A writer function with `function(bucket, key, data)` signature, used to write the exported Delta Log (e.g. `aws/s3.s3_client.put_object`) +- `delta_client`: A Delta Lake client that implements `get_table: function(repo, ref, prefix)` +- `table_descriptors_path`: The path under which the table descriptors of the provided `table_names` reside + +Example: + +```yaml +--- +name: delta_exporter +on: + post-commit: null +hooks: + - id: delta_export + type: lua + properties: + script: | + local aws = require("aws") + local formats = require("formats") + local delta_exporter = require("lakefs/catalogexport/delta_exporter") + + local table_descriptors_path = "_lakefs_tables" + local sc = aws.s3_client(args.aws.access_key_id, args.aws.secret_access_key, args.aws.region) + local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key, args.aws.region) + local delta_table_locations = delta_exporter.export_delta_log(action, args.table_names, sc.put_object, delta_client, table_descriptors_path) + + for t, loc in pairs(delta_table_locations) do + print("Delta Lake exported table \"" .. t .. "\"'s location: " .. loc .. "\n") + end + args: + aws: + access_key_id: + secret_access_key: + region: us-east-1 + lakefs: + access_key_id: + secret_access_key: + table_names: + - mytable +``` + +For the table descriptor under the `_lakefs_tables/mytable.yaml`: +```yaml +--- +name: myTableActualName +type: delta +path: a/path/to/my/delta/table +``` + ### `lakefs/catalogexport/table_extractor` Utility package to parse `_lakefs_tables/` descriptors. @@ -506,7 +627,7 @@ Example: ```yaml --- -name: test_delta_exporter +name: delta_exporter on: post-commit: null hooks: @@ -538,13 +659,61 @@ hooks: - my/delta/table/path ``` -For the table descriptor under the `_lakefs_tables/my/delta/table/path.yaml`: +### `lakefs/catalogexport/unity_exporter` + +A package used to register exported Delta Lake tables to Databricks' Unity catalog. + +### `lakefs/catalogexport/unity_exporter.register_tables(action, table_descriptors_path, delta_table_paths, databricks_client, warehouse_id)` + +The function used to register exported Delta Lake tables in Databricks' Unity Catalog. +The registration will use the following paths to register the table: +`..` where the branch name will be used as the schema name. +The return value is a table with mapping of table names to registration request status. + +Parameters: + +- `action(table)`: The global action table +- `table_descriptors_path(string)`: The path under which the table descriptors of the provided `table_paths` reside. +- `delta_table_paths(table)`: Table names to physical paths mapping (e.g. `{ table1 = "s3://mybucket/mytable1", table2 = "s3://mybucket/mytable2" }`) +- `databricks_client(table)`: A Databricks client that implements `create_or_get_schema: function(id, catalog_name)` and `register_external_table: function(table_name, physical_path, warehouse_id, catalog_name, schema_name)` +- `warehouse_id(string)`: Databricks warehouse ID. + +Example: +The following registers an exported Delta Lake table to Unity Catalog. + +```lua +local databricks = require("databricks") +local unity_export = require("lakefs/catalogexport/unity_exporter") + +local delta_table_locations = { + ["table1"] = "s3://mybucket/mytable1", +} +-- Register the exported table in Unity Catalog: +local action_details = { + repository_id = "my-repo", + commit_id = "commit_id", + branch_id = "main", +} +local databricks_client = databricks.client("", "") +local registration_statuses = unity_export.register_tables(action_details, "_lakefs_tables", delta_table_locations, databricks_client, "") + +for t, status in pairs(registration_statuses) do + print("Unity catalog registration for table \"" .. t .. "\" completed with status: " .. status .. "\n") +end +``` + +For the table descriptor under the `_lakefs_tables/delta-table-descriptor.yaml`: ```yaml --- -name: myTableActualName +name: my_table_name type: delta +path: path/to/delta/table/data +catalog: my-catalog ``` +For detailed step-by-step guide on how to use `unity_exporter.register_tables` as a part of a lakeFS action refer to +the [Unity Catalog docs]({% link integrations/unity_catalog.md %}). + ### `path/parse(path_string)` Returns a table for the given path string with the following structure: diff --git a/docs/integrations/unity_catalog.md b/docs/integrations/unity_catalog.md new file mode 100644 index 00000000000..3ec205c207d --- /dev/null +++ b/docs/integrations/unity_catalog.md @@ -0,0 +1,207 @@ +--- +title: Unity Catalog +description: Accessing lakeFS-exported Delta Lake tables from Unity Catalog. +parent: Integrations +redirect_from: /using/unity_catalog.html +--- + +# Using lakeFS with the Unity Catalog + +{% include toc_2-3.html %} + +## Overview + +Databricks Unity Catalog serves as a centralized data governance platform for your data lakes. +Through the Unity Catalog, you can search for and locate data assets across workspaces via a unified catalog. +Leveraging the external tables feature within Unity Catalog, you can register a Delta Lake table exported from lakeFS and +access it through the unified catalog. +The subsequent step-by-step guide will lead you through the process of configuring a [Lua hook]({% link howto/hooks/lua.md %}) +that exports Delta Lake tables from lakeFS, and subsequently registers them in Unity Catalog. + +{: .note} +> Currently, Unity Catalog export feature exclusively supports AWS S3 as the underlying storage solution. It's planned to [support other cloud providers soon](https://github.com/treeverse/lakeFS/issues/7199). + +## Prerequisites + +Before starting, ensure you have the following: + +1. Access to Unity Catalog +2. An active lakeFS installation with S3 as the backing storage, and a repository in this installation. +3. A Databricks SQL warehouse. +4. AWS Credentials with S3 access. +5. lakeFS credentials with access to your Delta Tables. + +### Databricks authentication + +Given that the hook will ultimately register a table in Unity Catalog, authentication with Databricks is imperative. +Make sure that: + +1. You have a Databricks [Service Principal](https://docs.databricks.com/en/dev-tools/service-principals.html). +2. The Service principal has [token usage permissions](https://docs.databricks.com/en/dev-tools/service-principals.html#step-3-assign-workspace-level-permissions-to-the-databricks-service-principal), + and an associated [token](https://docs.databricks.com/en/dev-tools/service-principals.html#step-4-generate-a-databricks-personal-access-token-for-the-databricks-service-principal) + configured. +3. The service principal has the `Service principal: Manager` privilege over itself (Workspace: Admin console -> Service principals -> `` -> Permissions -> Grant access (``: + `Service principal: Manager`), with `Workspace access` and `Databricks SQL access` checked (Admin console -> Service principals -> `` -> Configurations). +4. Your SQL warehouse allows the service principal to use it (SQL Warehouses -> `` -> Permissions -> ``: `Can use`). +5. The catalog grants the `USE CATALOG`, `USE SCHEMA`, `CREATE SCHEMA` permissions to the service principal(Catalog -> `` -> Permissions -> Grant -> ``: `USE CATALOG`, `USE SCHEMA`, `CREATE SCHEMA`). +6. You have an _External Location_ configured, and the service principal has the `CREATE EXTERNAL TABLE` permission over it (Catalog -> External Data -> External Locations -> Create location). + +## Guide + +### Table descriptor definition + +To guide the Unity Catalog exporter in configuring the table in the catalog, define its properties in the Delta Lake table descriptor. +The table descriptor should include (at minimum) the following fields: +1. `name`: The table name. +2. `type`: Should be `delta`. +3. `catalog`: The name of the catalog in which the table will be created. +4. `path`: The path in lakeFS (starting from the root of the branch) in which the Delta Lake table's data is found. + +Let's define the table descriptor and upload it to lakeFS: + +Save the following as `famous-people-td.yaml`: + +```yaml +--- +name: famous_people +type: delta +catalog: my-catalog-name +path: tables/famous-people +``` + +{: .note} +> It's recommended to create a Unity catalog with the same name as your repository + +Upload the table descriptor to `_lakefs_tables/famous-people-td.yaml` and commit: + +```bash +lakectl fs upload lakefs://repo/main/_lakefs_tables/famous-people-td.yaml -s ./famous-people-td.yaml && \ +lakectl commit lakefs://repo/main -m "add famous people table descriptor" +``` + +### Write some data + +Insert data into the table path, using your preferred method (e.g. [Spark]({% link integrations/spark.md %})), and commit upon completion. + +We shall use Spark and lakeFS's S3 gateway to write some data as a Delta table: +```bash +pyspark --packages "io.delta:delta-spark_2.12:3.0.0,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262" \ + --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \ + --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog \ + --conf spark.hadoop.fs.s3a.aws.credentials.provider='org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider' \ + --conf spark.hadoop.fs.s3a.endpoint='' \ + --conf spark.hadoop.fs.s3a.access.key='' \ + --conf spark.hadoop.fs.s3a.secret.key='' \ + --conf spark.hadoop.fs.s3a.path.style.access=true +``` + +```python +data = [ + ('James','Bond','England','intelligence'), + ('Robbie','Williams','England','music'), + ('Hulk','Hogan','USA','entertainment'), + ('Mister','T','USA','entertainment'), + ('Rafael','Nadal','Spain','professional athlete'), + ('Paul','Haver','Belgium','music'), +] +columns = ["firstname","lastname","country","category"] +df = spark.createDataFrame(data=data, schema = columns) +df.write.format("delta").mode("overwrite").partitionBy("category", "country").save("s3a://repo/main/tables/famous-people") +``` + +### The Unity Catalog exporter script + +{: .note} +> For code references check [delta_exporter]({% link howto/hooks/lua.md %}#lakefscatalogexportdelta_exporter) and +[unity_exporter]({% link howto/hooks/lua.md %}#lakefscatalogexportunity_exporter) docs. + +Create `unity_exporter.lua`: + +```lua +local aws = require("aws") +local formats = require("formats") +local databricks = require("databricks") +local delta_export = require("lakefs/catalogexport/delta_exporter") +local unity_export = require("lakefs/catalogexport/unity_exporter") + +local sc = aws.s3_client(args.aws.access_key_id, args.aws.secret_access_key, args.aws.region) + +-- Export Delta Lake tables export: +local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key, args.aws.region) +local delta_table_locations = delta_export.export_delta_log(action, args.table_defs, sc.put_object, delta_client, "_lakefs_tables") + +-- Register the exported table in Unity Catalog: +local databricks_client = databricks.client(args.databricks_host, args.databricks_token) +local registration_statuses = unity_export.register_tables(action, "_lakefs_tables", delta_table_locations, databricks_client, args.warehouse_id) + +for t, status in pairs(registration_statuses) do + print("Unity catalog registration for table \"" .. t .. "\" completed with commit schema status : " .. status .. "\n") +end +``` + +Upload the lua script to the `main` branch under `scripts/unity_exporter.lua` and commit: + +```bash +lakectl fs upload lakefs://repo/main/scripts/unity_exporter.lua -s ./unity_exporter.lua && \ +lakectl commit lakefs://repo/main -m "upload unity exporter script" +``` + +### Action configuration + +Define an action configuration that will run the above script after a commit is completed (`post-commit`) over the `main` branch. + +Create `unity_exports_action.yaml`: + +```yaml +--- +name: unity_exports +on: + post-commit: + branches: ["main"] +hooks: + - id: unity_export + type: lua + properties: + script_path: scripts/unity_exporter.lua + args: + aws: + access_key_id: + secret_access_key: + region: + lakefs: # provide credentials of a user that has access to the script and Delta Table + access_key_id: + secret_access_key: + table_defs: # an array of table descriptors used to be defined in Unity Catalog + - famous-people-td + databricks_host: + databricks_token: + warehouse_id: +``` + +Upload the action configurations to `_lakefs_actions/unity_exports_action.yaml` and commit: + +{: .note} +> Once the commit will finish its run, the action will start running since we've configured it to run on `post-commit` +events on the `main` branch. + +```bash +lakectl fs upload lakefs://repo/main/_lakefs_actions/unity_exports_action.yaml -s ./unity_exports_action.yaml && \ +lakectl commit lakefs://repo/main -m "upload action and run it" +``` + +The action has run and exported the `famous_people` Delta Lake table to the repo's storage namespace, and has register +the table as an external table in Unity Catalog under the catalog `my-catalog-name`, schema `main` (as the branch's name) and +table name `famous_people`: `my-catalog-name.main.famous_people`. + +![Hooks log result in lakeFS UI]({{ site.baseurl }}/assets/img/unity_export_hook_result_log.png) + +### Databricks Integration + +After registering the table in Unity, you can leverage your preferred method to [query the data](https://docs.databricks.com/en/query/index.html) +from the exported table under `my-catalog-name.main.famous_people`, and view it in the Databricks's Catalog Explorer, or +retrieve it using the Databricks CLI with the following command: +```bash +databricks tables get my-catalog-name.main.famous_people +``` + +![Unity Catalog Explorer view]({{ site.baseurl }}/assets/img/unity_exported_table_columns.png) diff --git a/examples/hooks/delta_lake_S3_export.lua b/examples/hooks/delta_lake_S3_export.lua index 88b01030f5b..8b92366b8a5 100644 --- a/examples/hooks/delta_lake_S3_export.lua +++ b/examples/hooks/delta_lake_S3_export.lua @@ -1,6 +1,6 @@ --[[ args: - - table_paths (e.g. ["path/to/table1", "path/to/table2", ...]) + - table_defs (e.g. ["table1.yaml", "table2", ...]) - lakefs.access_key_id - lakefs.secret_access_key - aws.access_key_id @@ -16,7 +16,7 @@ local table_descriptors_path = "_lakefs_tables" local sc = aws.s3_client(args.aws.access_key_id, args.aws.secret_access_key, args.aws.region) local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key, args.aws.region) -local delta_table_locations = delta_export.export_delta_log(action, args.table_paths, sc.put_object, delta_client, table_descriptors_path) +local delta_table_locations = delta_export.export_delta_log(action, args.table_defs, sc.put_object, delta_client, table_descriptors_path) for t, loc in pairs(delta_table_locations) do print("Delta Lake exported table \"" .. t .. "\"'s location: " .. loc .. "\n") end diff --git a/examples/hooks/unity_table_export.lua b/examples/hooks/unity_table_export.lua new file mode 100644 index 00000000000..e5d7a29c58e --- /dev/null +++ b/examples/hooks/unity_table_export.lua @@ -0,0 +1,24 @@ +--[[ + As an exhaustive example, it will first start off with a Delta Lake tables export, then continue to register the table + with Unity Catalog +]] + +local aws = require("aws") +local formats = require("formats") +local databricks = require("databricks") +local delta_export = require("lakefs/catalogexport/delta_exporter") +local unity_export = require("lakefs/catalogexport/unity_exporter") + +local sc = aws.s3_client(args.aws.access_key_id, args.aws.secret_access_key, args.aws.region) + +-- Export Delta Lake tables export: +local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key, args.aws.region) +local delta_table_locations = delta_export.export_delta_log(action, args.table_defs, sc.put_object, delta_client, "_lakefs_tables") + +-- Register the exported table in Unity Catalog: +local databricks_client = databricks.client(args.databricks_host, args.databricks_token) +local registration_statuses = unity_export.register_tables(action, "_lakefs_tables", delta_table_locations, databricks_client, args.warehouse_id) + +for t, status in pairs(registration_statuses) do + print("Unity catalog registration for table \"" .. t .. "\" completed with status: " .. status .. "\n") +end diff --git a/pkg/actions/lua/databricks/client.go b/pkg/actions/lua/databricks/client.go index 6a5e99c455e..d42c6fc1509 100644 --- a/pkg/actions/lua/databricks/client.go +++ b/pkg/actions/lua/databricks/client.go @@ -112,6 +112,7 @@ func newDatabricksClient(l *lua.State) (*databricks.WorkspaceClient, error) { func (client *Client) RegisterExternalTable(l *lua.State) int { tableName := lua.CheckString(l, 1) + tableName = strings.ReplaceAll(tableName, "-", "_") location := lua.CheckString(l, 2) warehouseID := lua.CheckString(l, 3) catalogName := lua.CheckString(l, 4) diff --git a/pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua b/pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua index 6f6da379dea..f872b75ead0 100644 --- a/pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua +++ b/pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua @@ -3,7 +3,7 @@ local pathlib = require("path") local json = require("encoding/json") local utils = require("lakefs/catalogexport/internal") local extractor = require("lakefs/catalogexport/table_extractor") - +local strings = require("strings") --[[ delta_log_entry_key_generator returns a closure that returns a Delta Lake version key according to the Delta Lake protocol: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#delta-log-entries @@ -33,7 +33,7 @@ end - repository_id - commit_id - table_paths: ["path/to/table1", "path/to/table2", ...] + table_def_names: ["table1.yaml", "table2", ...] write_object: function(bucket, key, data) @@ -41,17 +41,37 @@ end - get_table: function(repo, ref, prefix) ]] -local function export_delta_log(action, table_paths, write_object, delta_client, table_descriptors_path) +local function export_delta_log(action, table_def_names, write_object, delta_client, table_descriptors_path) local repo = action.repository_id local commit_id = action.commit_id - + if not commit_id then + error("missing commit id") + end local ns = action.storage_namespace if ns == nil then error("failed getting storage namespace for repo " .. repo) end local response = {} - for _, path in ipairs(table_paths) do - local t = delta_client.get_table(repo, commit_id, path) + for _, table_name_yaml in ipairs(table_def_names) do + + -- Get the table descriptor + local tny = table_name_yaml + if not strings.has_suffix(tny, ".yaml") then + tny = tny .. ".yaml" + end + local table_src_path = pathlib.join("/", table_descriptors_path, tny) + local table_descriptor = extractor.get_table_descriptor(lakefs, repo, commit_id, table_src_path) + local table_path = table_descriptor.path + if not table_path then + error("table path is required to proceed with Delta catalog export") + end + local table_name = table_descriptor.name + if not table_name then + error("table name is required to proceed with Delta catalog export") + end + + -- Get Delta table + local t = delta_client.get_table(repo, commit_id, table_path) local sortedKeys = utils.sortedKeys(t) --[[ Pairs of (version, map of json content): (1, @@ -82,7 +102,7 @@ local function export_delta_log(action, table_paths, write_object, delta_client, p = entry.remove.path end if p ~= "" then - local code, obj = lakefs.stat_object(repo, commit_id, pathlib.join("/",path, p)) + local code, obj = lakefs.stat_object(repo, commit_id, pathlib.join("/", table_path, p)) if code == 200 then local obj_stat = json.unmarshal(obj) local physical_path = obj_stat["physical_address"] @@ -99,13 +119,6 @@ local function export_delta_log(action, table_paths, write_object, delta_client, table_log[keyGenerator()] = entry_log end - -- Get the table delta log physical location - local table_src_path = pathlib.join("/", table_descriptors_path, path .. ".yaml") - local table_descriptor = extractor.get_table_descriptor(lakefs, repo, commit_id, table_src_path) - local table_name = table_descriptor.name - if not table_name then - error("table name is required to proceed with Delta catalog export") - end local table_export_prefix = utils.get_storage_uri_prefix(ns, commit_id, action) local table_physical_path = pathlib.join("/", table_export_prefix, table_name) local table_log_physical_path = pathlib.join("/", table_physical_path, "_delta_log") @@ -131,7 +144,7 @@ local function export_delta_log(action, table_paths, write_object, delta_client, local version_key = storage_props.key .. "/" .. entry_version write_object(storage_props.bucket, version_key, table_entry_string) end - response[path] = table_physical_path + response[table_name_yaml] = table_physical_path end return response end diff --git a/pkg/actions/lua/lakefs/catalogexport/internal.lua b/pkg/actions/lua/lakefs/catalogexport/internal.lua index 7125cab9b56..1db29e312c7 100644 --- a/pkg/actions/lua/lakefs/catalogexport/internal.lua +++ b/pkg/actions/lua/lakefs/catalogexport/internal.lua @@ -1,7 +1,5 @@ local url = require("net/url") local pathlib = require("path") -local lakefs = require("lakefs") -local json = require("encoding/json") local DEFAULT_SHORT_DIGEST_LEN=6 local function deepcopy(orig) diff --git a/pkg/actions/lua/lakefs/catalogexport/unity_exporter.lua b/pkg/actions/lua/lakefs/catalogexport/unity_exporter.lua new file mode 100644 index 00000000000..1d961ce23f1 --- /dev/null +++ b/pkg/actions/lua/lakefs/catalogexport/unity_exporter.lua @@ -0,0 +1,61 @@ +--[[ TABLE SPECIFICATION: _lakefs_tables/ +name:
+type: delta +catalog: +]] +local strings = require("strings") +local pathlib = require("path") +local lakefs = require("lakefs") +local extractor = require("lakefs/catalogexport/table_extractor") +--[[ + - table_descriptors_path: the path under which the table descriptors reside (e.g. "_lakefs_tables"). + It's necessary that every
in the provided `table_paths` will have a complementary + `/
.yaml` file describing the used Delta Table. + - delta_table_paths: a mapping of Delta Lake table descriptors yaml name (with or without ".yaml" extension) to their locations in the object storage + { : } + - databricks_client: a client to interact with databricks. + - warehouse_id: Databricks warehouse ID + + Returns a "
: status" map for registration of provided tables. +]] +local function register_tables(action, table_descriptors_path, delta_table_paths, databricks_client, warehouse_id) + local repo = action.repository_id + local commit_id = action.commit_id + if not commit_id then + error("missing commit id") + end + local branch_id = action.branch_id + local response = {} + for table_name_yaml, physical_path in pairs(delta_table_paths) do + local tny = table_name_yaml + if not strings.has_suffix(tny, ".yaml") then + tny = tny .. ".yaml" + end + local table_src_path = pathlib.join("/", table_descriptors_path, tny) + local table_descriptor = extractor.get_table_descriptor(lakefs, repo, commit_id, table_src_path) + local table_name = table_descriptor.name + if not table_name then + error("table name is required to proceed with unity catalog export") + end + if table_descriptor.type ~= "delta" then + error("unity exporter supports only table descriptors of type 'delta'. registration failed for table " .. table_name) + end + local catalog = table_descriptor.catalog + if not catalog then + error("catalog name is required to proceed with unity catalog export") + end + local get_schema_if_exists = true + local schema_name = databricks_client.create_schema(branch_id, catalog, get_schema_if_exists) + if not schema_name then + error("failed creating/getting catalog's schema: " .. catalog .. "." .. branch_id) + end + local status = databricks_client.register_external_table(table_name, physical_path, warehouse_id, catalog, schema_name) + response[table_name_yaml] = status + end + return response +end + + +return { + register_tables = register_tables, +} diff --git a/pkg/actions/lua_test.go b/pkg/actions/lua_test.go index 976632e911f..78449c6d6dd 100644 --- a/pkg/actions/lua_test.go +++ b/pkg/actions/lua_test.go @@ -367,9 +367,12 @@ func TestLuaRunTable(t *testing.T) { Output: "testdata/lua/catalogexport_hive_partition_pager.output", }, { - Name: "catalogexport_delta", - Input: "testdata/lua/catalogexport_delta.lua", - Output: "", + Name: "catalogexport_delta", + Input: "testdata/lua/catalogexport_delta.lua", + }, + { + Name: "catalogexport_unity", + Input: "testdata/lua/catalogexport_unity.lua", }, } diff --git a/pkg/actions/testdata/lua/catalogexport_delta.lua b/pkg/actions/testdata/lua/catalogexport_delta.lua index a73592cc689..82852f8d779 100644 --- a/pkg/actions/testdata/lua/catalogexport_delta.lua +++ b/pkg/actions/testdata/lua/catalogexport_delta.lua @@ -1,6 +1,7 @@ local pathlib = require("path") local json = require("encoding/json") local utils = require("lakefs/catalogexport/internal") +local strings = require("strings") local test_data = { @@ -44,10 +45,12 @@ end package.loaded["lakefs/catalogexport/table_extractor"] = { get_table_descriptor = function(_, _, _, table_src_path) local t_name_yaml = pathlib.parse(table_src_path) - assert(t_name_yaml["base_name"] == ".yaml") - local t_name = pathlib.parse(t_name_yaml["parent"]) + local t_name_yaml_base = t_name_yaml["base_name"] + assert(strings.has_suffix(t_name_yaml_base, ".yaml")) + local t_name = strings.split(t_name_yaml_base, ".")[1] return { - name = t_name["base_name"] + name = t_name, + path = t_name } end } @@ -56,6 +59,9 @@ package.loaded.lakefs = { stat_object = function(_, _, path) local parsed_path = pathlib.parse(path) local table_path_base = parsed_path["parent"] + if strings.has_suffix(table_path_base, "/") then + table_path_base = strings.split(table_path_base, "/")[1] + end if not test_data.table_to_objects[table_path_base] then test_data.table_to_objects[table_path_base] = {} end @@ -102,8 +108,8 @@ local function assert_physical_address(delta_table_locations, table_paths) end end -local function assert_lakefs_stats(table_paths, content_paths) - for _, table_path in ipairs(table_paths) do +local function assert_lakefs_stats(table_names, content_paths) + for _, table_path in ipairs(table_names) do local table = test_data.table_to_objects[table_path] if not table then error("missing lakeFS stat_object call for table path: " .. table_path .. "\n") @@ -143,11 +149,10 @@ end -- Test data local data_paths = { "part-c000.snappy.parquet", "part-c001.snappy.parquet", "part-c002.snappy.parquet", "part-c003.snappy.parquet" } -local test_table_paths = {"path/to/table1/", "path/to/table2/"} +local test_table_names = { "table1", "table2"} -for _, table_path in ipairs(test_table_paths) do - local table_name = pathlib.parse(table_path)["base_name"] - test_data.table_logs_content[table_path] = { +for _, table_name in ipairs(test_table_names) do + test_data.table_logs_content[table_name] = { ["_delta_log/00000000000000000000.json"] = { "{\"commitInfo\":\"some info\"}", "{\"add\": {\"path\":\"part-c000.snappy.parquet\"}}", @@ -163,14 +168,14 @@ for _, table_path in ipairs(test_table_paths) do test_data.table_expected_log[table_name] = { ["_delta_log/00000000000000000000.json"] = { "{\"commitInfo\":\"some info\"}", - "{\"add\":{\"path\":\"" .. generate_physical_address(table_path .. "part-c000.snappy.parquet") .. "\"}}", - "{\"remove\":{\"path\":\"" .. generate_physical_address(table_path .. "part-c001.snappy.parquet") .. "\"}}", + "{\"add\":{\"path\":\"" .. generate_physical_address(table_name .. "/part-c000.snappy.parquet") .. "\"}}", + "{\"remove\":{\"path\":\"" .. generate_physical_address(table_name .. "/part-c001.snappy.parquet") .. "\"}}", "{\"protocol\":\"the protocol\"}", }, ["_delta_log/00000000000000000001.json"] = { "{\"metaData\":\"some metadata\"}", - "{\"add\":{\"path\":\"" .. generate_physical_address(table_path .. "part-c002.snappy.parquet") .. "\"}}", - "{\"remove\":{\"path\":\"" .. generate_physical_address(table_path .. "part-c003.snappy.parquet") .. "\"}}", + "{\"add\":{\"path\":\"" .. generate_physical_address(table_name .. "/part-c002.snappy.parquet") .. "\"}}", + "{\"remove\":{\"path\":\"" .. generate_physical_address(table_name .. "/part-c003.snappy.parquet") .. "\"}}", } } end @@ -179,13 +184,13 @@ end -- Run Delta export test local delta_table_locations = delta_export.export_delta_log( action, - test_table_paths, + test_table_names, mock_object_writer, mock_delta_client(test_data.table_logs_content), "some_path" ) -- Test results -assert_lakefs_stats(test_table_paths, data_paths) -assert_physical_address(delta_table_locations, test_table_paths) +assert_lakefs_stats(test_table_names, data_paths) +assert_physical_address(delta_table_locations, test_table_names) assert_delta_log_content(delta_table_locations, test_data.table_expected_log) diff --git a/pkg/actions/testdata/lua/catalogexport_unity.lua b/pkg/actions/testdata/lua/catalogexport_unity.lua new file mode 100644 index 00000000000..ebf6330bdbd --- /dev/null +++ b/pkg/actions/testdata/lua/catalogexport_unity.lua @@ -0,0 +1,170 @@ +local action = { + repository_id = "myRepo", + commit_id = "myCommit", + branch_id = "myBranch", +} + +-- table names must be unique +local test_cases = { + { + name = "failed_not_delta_type", + tables = { + ["my_table_not_delta"] = { + td = { + type = "notDelta", + catalog = "ok", + name = "notDelta", + }, + }, + }, + error = "registration failed", + }, + { + name = "failed_no_catalog_name", + tables = { + ["my_table_no_catalog"] = { + td = { + type = "delta", + name = "noCatalog", + }, + }, + }, + error = "catalog name is required", + }, + { + name = "failed_no_name", + tables = { + ["my_table_no_name"] = { + td = { + type = "delta", + catalog = "ok", + }, + }, + }, + error = "table name is required", + }, + { + name = "failed_schema_creation", + tables = { + ["my_table_schema_failure"] = { + td = { + type = "delta", + catalog = "ok", + name = "schemaFailure", + }, + }, + }, + schema_failure = true, + error = "failed creating/getting catalog's schema", + }, + { + name = "success_all_tables", + tables = { + ["my_table_success"] = { + status = "SUCCEEDED", + }, + ["my_table2_success"] = { + status = "SUCCEEDED", + }, + ["my_table3_success"] = { + status = "SUCCEEDED", + }, + }, + }, + { + name = "mixed_statuses", + tables = { + ["my_table_failure"] = { + status = "FAILED", + }, + ["my_table2_success_2"] = { + status = "SUCCEEDED", + }, + ["my_table3_failure"] = { + status = "FAILED", + }, + }, + }, +} + +-- Loads a mock table (descriptor) extractor +local function load_table_descriptor(tables) + package.loaded["lakefs/catalogexport/table_extractor"] = { + get_table_descriptor = function(_, _, _, table_src_path) + local examined_tables = {} + for name, t in pairs(tables) do + table.insert(examined_tables, name) + if string.find(table_src_path, name) then + if not t.td then + return { + type = "delta", + catalog = "ok", + name = name + } + end + return t.td + end + end + error("test was configured incorrectly. expected to find a table descriptor for table \"" .. table_src_path .. "\" but no such was found." ) + end + } +end + +-- Generates a mock databricks client +local function db_client(schema_failure, tables) + return { + create_schema = function(branch_id, catalog, _) + if schema_failure then + return nil + end + return catalog .. "." .. branch_id + end, + register_external_table = function(table_name, _, _, _, _) + for name, t in pairs(tables) do + if name == table_name then + return t.status + end + end + end + } +end + +--------------------------------- +---------- Begin tests ---------- +--------------------------------- +for _, test in ipairs(test_cases) do + package.loaded["lakefs/catalogexport/unity_exporter"] = nil + load_table_descriptor(test.tables) + local unity_export = require("lakefs/catalogexport/unity_exporter") + local err = test.error + local schema_failure = test.schema_failure + local test_tables = test.tables + local table_paths = {} + for name, _ in pairs(test_tables) do + table_paths[name] = "s3://physical/" .. name + end + + local db = db_client(schema_failure, test_tables) + -- Run test: + local s, resp = pcall(unity_export.register_tables, action, "_lakefs_tables", table_paths, db, "id") + if err ~= nil then + if s ~= false then -- the status is true which means no error was returned + local str_resp = "" + for k, v in pairs(resp) do + str_resp = str_resp .. k .. " = " .. v .. "\n" + end + error("test " .. test.name .. " expected an error:\n" .. err .. "\nbut returned status: \"" .. tostring(s) .. "\"\nresponse:\n" .. str_resp) + end + -- status is false as expected -> error returned + if string.find(resp, err) == nil then + error("test " .. test.name .. " returned incorrect error.\nexpected:\n" .. err .. "\nactual:\n" .. resp) + end + else + for table_name, status in pairs(resp) do + local expected_status = test.tables[table_name].status + if expected_status ~= status then + error("test " .. test.name .. " returned incorrect status for table \"" .. table_name .."\"\nexpected: \"" .. expected_status .. "\"\nactual:\n\"" .. status .. "\"") + end + end + end +end