Skip to content

Commit

Permalink
Unity catalog export implementation (#7167)
Browse files Browse the repository at this point in the history
* add databricks client for lua

* add databricks client

* remove redundant comment

* register databricks client

* open services for lua

* linter

* Unity export implementation

* remove unnecessary imports

* PR changes

* sort imports

* createOrGetSchema

* remove redundant code, extract full name creation procedure

* fullName -> tableFullName

* remove unnecessary assignment

* change structure of databricks lua

* update unity exporter example

* linter

* linter

* add validation to databricks client

* linter

* handle error if databricks client couldn't be initialized

* PR changes

* update unity exporter to use the updated databricks method

* update delta exporter

* change unity exporter to use table names instead of logical path

* fix delta exporter tests

* give descriptive regex name. fix error returning format. extract "alreadyExists" function that. some reformatings

* change databricks client lua client functions to be a part of the Go client

* use table descriptor table name. pass table definition file names instead of table names

* validate table name at the correct place. fix delta export test

* PR fixes

* pass "get_schema_if_exists" to create_or_get_schema

* rename function

* use new "create schema" name

* change OptString -> ToBoolean

* delete unused sentinel error

* add names to errors

* validate commit id

* Lua: Insert format to all `lua.Errorf` calls (#7189)

* Unity catalog exporter: tests (#7176)

* add tests for unity catalog exporter

* Beautify comment

* remove false test

* Revert "remove false test"

This reverts commit ad13f3c.

* add name to mock table descriptor

* update tests to new databricks client

* PR fix

* fix returning error

* Unity Catalog Export: Documentation (#7183)

* add unity_exporter.register_tables doc entry

* add docs to unity catalog

* add catalog permissions config

* fix lua docs

* add to unity_catalog docs

* document unity catalog exporter steps

* rephrase Unity Catalog exporter docs

* fix `unity_exporter.register_tables` docs

* rephrase the unity catalog exporter section in `catalog_exports`

* remove unnecessary numbering

* add types and fix docs

* PR fixes

* Note that only AWS S3 is supported with Unity Catalog export

* Change the supported exporter section to a table
  • Loading branch information
Jonathan-Rosenberg authored Dec 24, 2023
1 parent 2b11876 commit 58a730b
Show file tree
Hide file tree
Showing 14 changed files with 699 additions and 45 deletions.
Binary file added docs/assets/img/unity_export_hook_result_log.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/assets/img/unity_exported_table_columns.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 7 additions & 4 deletions docs/howto/catalog_exports.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,12 @@ Exporters are code packages accessible through [Lua integration]({% link howto/h

#### Currently supported exporters

- Symlink Exporter: Writes metadata for the table using Hive's [SymlinkTextInputFormat](https://svn.apache.org/repos/infra/websites/production/hive/content/javadocs/r2.1.1/api/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.html)
- AWS Glue Catalog (+ Athena) Exporter: Creates a table in Glue using Hive's format and updates the location to symlink files (reuses Symlink Exporter).
- See a step by step guide on how to integrate with [Glue Exporter]({% link integrations/glue_metastore.md %})
| Exporter | Description | Notes |
|:-----------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **Symlink exporter** | Writes metadata for the table using Hive's [SymlinkTextInputFormat](https://svn.apache.org/repos/infra/websites/production/hive/content/javadocs/r2.1.1/api/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.html) | |
| **AWS Glue Catalog (+ Athena) exporter** | Creates a table in Glue using Hive's format and updates the location to symlink files (reuses Symlink Exporter). | See a step-by-step guide on how to integrate with [Glue Exporter]({% link integrations/glue_metastore.md %}) |
| **Delta Lake table exporter** | Export Delta Lake tables from lakeFS to an external storage | |
| **Unity Catalog exporter** | The Unity Catalog exporter serves the purpose of registering a Delta Lake table in Unity Catalog. It operates in conjunction with the Delta Lake exporter. In this workflow, the Delta Lake exporter is utilized to export a Delta Lake table from lakeFS. Subsequently, the obtained result is passed to the Unity Catalog exporter to facilitate its registration within Unity Catalog. | See a step-by-step guide on how to integrate with [Unity Catalog Exporter]({% link integrations/unity_catalog.md %})</br>Currently, only AWS S3 storage is supported |

#### Running an Exporter

Expand Down Expand Up @@ -139,4 +142,4 @@ sequenceDiagram
Exporter->>Catalog: register object store location
Query Engine-->Catalog: Query
Query Engine-->Object Store: Query
```
```
175 changes: 172 additions & 3 deletions docs/howto/hooks/lua.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,57 @@ Returns the MD5 digest (string) of the given data

Returns the SHA256 digest (string) of the given data

### `databricks/client(databricks_host, databricks_service_principal_token)`

Returns a table representing a Databricks client with the `register_external_table` and `create_or_get_schema` methods.

### `databricks/client.create_schema(schema_name, catalog_name, get_if_exists)`

Creates a schema, or retrieves it if exists, in the configured Databricks host's Unity catalog.
If a schema doesn't exist, a new schema with the given `schema_name` will be created under the given `catalog_name`.
Returns the created/fetched schema name.

Parameters:

- `schema_name(string)`: The required schema name
- `catalog_name(string)`: The catalog name under which the schema will be created (or from which it will be fetched)
- `get_if_exists(boolean)`: In case of failure due to an existing schema with the given `schema_name` in the given
`catalog_name`, return the schema.

Example:

```lua
local databricks = require("databricks")
local client = databricks.client("https://my-host.cloud.databricks.com", "my-service-principal-token")
local schema_name = client.create_schema("main", "mycatalog", true)
```

### `databricks/client.register_external_table(table_name, physical_path, warehouse_id, catalog_name, schema_name)`

Registers an external table under the provided warehouse ID, catalog name, and schema name.
In order for this method call to succeed, an external location should be configured in the catalog, with the
`physical_path`'s root storage URI (for example: `s3://mybucket`).
Returns the table's creation status.

Parameters:

- `table_name(string)`: Table name.
- `physical_path(string)`: A location to which the external table will refer, e.g. `s3://mybucket/the/path/to/mytable`.
- `warehouse_id(string)`: The SQL warehouse ID used in Databricks to run the `CREATE TABLE` query (fetched from the SQL warehouse
`Connection Details`, or by running `databricks warehouses get`, choosing your SQL warehouse and fetching its ID).
- `catalog_name(string)`: The name of the catalog under which a schema will be created (or fetched from).
- `schema_name(string)`: The name of the schema under which the table will be created.

Example:

```lua
local databricks = require("databricks")
local client = databricks.client("https://my-host.cloud.databricks.com", "my-service-principal-token")
local status = client.register_external_table("mytable", "s3://mybucket/the/path/to/mytable", "examwarehouseple", "my-catalog-name", "myschema")
```

- For the Databricks permissions needed to run this method, check out the [Unity Catalog Exporter]({% link integrations/unity_catalog.md %}) docs.

### `encoding/base64/encode(data)`

Encodes the given data to a base64 string
Expand Down Expand Up @@ -348,6 +399,76 @@ Returns an object-wise diff of uncommitted changes on `branch_id`.

Returns a stat object for the given path under the given reference and repository.

### `lakefs/catalogexport/glue_exporter.get_full_table_name(descriptor, action_info)`

Generate glue table name.

Parameters:

- `descriptor(Table)`: Object from (e.g. _lakefs_tables/my_table.yaml).
- `action_info(Table)`: The global action object.

### `lakefs/catalogexport/delta_exporter`

A package used to export Delta Lake tables from lakeFS to an external cloud storage.

### `lakefs/catalogexport/delta_exporter.export_delta_log(action, table_names, writer, delta_client, table_descriptors_path)`

The function used to export Delta Lake tables.
The return value is a table with mapping of table names to external table location (from which it is possible to query the data).

Parameters:

- `action`: The global action object
- `table_names`: Delta tables name list (e.g. `{"table1", "table2"}`)
- `writer`: A writer function with `function(bucket, key, data)` signature, used to write the exported Delta Log (e.g. `aws/s3.s3_client.put_object`)
- `delta_client`: A Delta Lake client that implements `get_table: function(repo, ref, prefix)`
- `table_descriptors_path`: The path under which the table descriptors of the provided `table_names` reside

Example:

```yaml
---
name: delta_exporter
on:
post-commit: null
hooks:
- id: delta_export
type: lua
properties:
script: |
local aws = require("aws")
local formats = require("formats")
local delta_exporter = require("lakefs/catalogexport/delta_exporter")
local table_descriptors_path = "_lakefs_tables"
local sc = aws.s3_client(args.aws.access_key_id, args.aws.secret_access_key, args.aws.region)
local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key, args.aws.region)
local delta_table_locations = delta_exporter.export_delta_log(action, args.table_names, sc.put_object, delta_client, table_descriptors_path)
for t, loc in pairs(delta_table_locations) do
print("Delta Lake exported table \"" .. t .. "\"'s location: " .. loc .. "\n")
end
args:
aws:
access_key_id: <AWS_ACCESS_KEY_ID>
secret_access_key: <AWS_SECRET_ACCESS_KEY>
region: us-east-1
lakefs:
access_key_id: <LAKEFS_ACCESS_KEY_ID>
secret_access_key: <LAKEFS_SECRET_ACCESS_KEY>
table_names:
- mytable
```

For the table descriptor under the `_lakefs_tables/mytable.yaml`:
```yaml
---
name: myTableActualName
type: delta
path: a/path/to/my/delta/table
```

### `lakefs/catalogexport/table_extractor`

Utility package to parse `_lakefs_tables/` descriptors.
Expand Down Expand Up @@ -506,7 +627,7 @@ Example:

```yaml
---
name: test_delta_exporter
name: delta_exporter
on:
post-commit: null
hooks:
Expand Down Expand Up @@ -538,13 +659,61 @@ hooks:
- my/delta/table/path
```

For the table descriptor under the `_lakefs_tables/my/delta/table/path.yaml`:
### `lakefs/catalogexport/unity_exporter`

A package used to register exported Delta Lake tables to Databricks' Unity catalog.

### `lakefs/catalogexport/unity_exporter.register_tables(action, table_descriptors_path, delta_table_paths, databricks_client, warehouse_id)`

The function used to register exported Delta Lake tables in Databricks' Unity Catalog.
The registration will use the following paths to register the table:
`<catalog>.<branch name>.<table_name>` where the branch name will be used as the schema name.
The return value is a table with mapping of table names to registration request status.

Parameters:

- `action(table)`: The global action table
- `table_descriptors_path(string)`: The path under which the table descriptors of the provided `table_paths` reside.
- `delta_table_paths(table)`: Table names to physical paths mapping (e.g. `{ table1 = "s3://mybucket/mytable1", table2 = "s3://mybucket/mytable2" }`)
- `databricks_client(table)`: A Databricks client that implements `create_or_get_schema: function(id, catalog_name)` and `register_external_table: function(table_name, physical_path, warehouse_id, catalog_name, schema_name)`
- `warehouse_id(string)`: Databricks warehouse ID.

Example:
The following registers an exported Delta Lake table to Unity Catalog.

```lua
local databricks = require("databricks")
local unity_export = require("lakefs/catalogexport/unity_exporter")
local delta_table_locations = {
["table1"] = "s3://mybucket/mytable1",
}
-- Register the exported table in Unity Catalog:
local action_details = {
repository_id = "my-repo",
commit_id = "commit_id",
branch_id = "main",
}
local databricks_client = databricks.client("<DATABRICKS_HOST>", "<DATABRICKS_TOKEN>")
local registration_statuses = unity_export.register_tables(action_details, "_lakefs_tables", delta_table_locations, databricks_client, "<WAREHOUSE_ID>")
for t, status in pairs(registration_statuses) do
print("Unity catalog registration for table \"" .. t .. "\" completed with status: " .. status .. "\n")
end
```

For the table descriptor under the `_lakefs_tables/delta-table-descriptor.yaml`:
```yaml
---
name: myTableActualName
name: my_table_name
type: delta
path: path/to/delta/table/data
catalog: my-catalog
```

For detailed step-by-step guide on how to use `unity_exporter.register_tables` as a part of a lakeFS action refer to
the [Unity Catalog docs]({% link integrations/unity_catalog.md %}).

### `path/parse(path_string)`

Returns a table for the given path string with the following structure:
Expand Down
Loading

0 comments on commit 58a730b

Please sign in to comment.