Skip to content

Commit

Permalink
Change the delta lake catalog export behavior to read a table descrip…
Browse files Browse the repository at this point in the history
…tor (#7166)

* change the delta lake catalog export behavior to read a table descriptor

* remove comment

* fix delta export test
  • Loading branch information
Jonathan-Rosenberg authored Dec 14, 2023
1 parent 384de89 commit 04466af
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 20 deletions.
28 changes: 18 additions & 10 deletions docs/howto/hooks/lua.md
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,11 @@ Export Symlink files that represent a table to S3 location.
Parameters:

- `s3_client`: Configured client.
- `table_src_path(string)`: Path to the table spec YAML file in `_lakefs_tables` (i.e _lakefs_tables/my_table.yaml).
- `table_src_path(string)`: Path to the table spec YAML file in `_lakefs_tables` (e.g. _lakefs_tables/my_table.yaml).
- `action_info(table)`: The global action object.
- `options(table)`:
- `debug(boolean)`: Print extra info.
- `export_base_uri(string)`: Override the prefix in S3 i.e `s3://other-bucket/path/`.
- `export_base_uri(string)`: Override the prefix in S3 e.g. `s3://other-bucket/path/`.
- `writer(function(bucket, key, data))`: If passed then will not use s3 client, helpful for debug.

Example:
Expand All @@ -437,15 +437,15 @@ Parameters:

- `glue`: AWS glue client
- `db(string)`: glue database name
- `table_src_path(string)`: path to table spec (i.e _lakefs_tables/my_table.yaml)
- `table_src_path(string)`: path to table spec (e.g. _lakefs_tables/my_table.yaml)
- `create_table_input(Table)`: Input equal mapping to [table_input](https://docs.aws.amazon.com/glue/latest/webapi/API_CreateTable.html#API_CreateTable_RequestSyntax) in AWS, the same as we use for `glue.create_table`.
should contain inputs describing the data format (i.e InputFormat, OutputFormat, SerdeInfo) since the exporter is agnostic to this.
should contain inputs describing the data format (e.g. InputFormat, OutputFormat, SerdeInfo) since the exporter is agnostic to this.
by default this function will configure table location and schema.
- `action_info(Table)`: the global action object.
- `options(Table)`:
- `table_name(string)`: Override default glue table name
- `debug(boolean`
- `export_base_uri(string)`: Override the default prefix in S3 for symlink location i.e s3://other-bucket/path/
- `export_base_uri(string)`: Override the default prefix in S3 for symlink location e.g. s3://other-bucket/path/

When creating a glue table, the final table input will consist of the `create_table_input` input parameter and lakeFS computed defaults that will override it:

Expand Down Expand Up @@ -482,14 +482,14 @@ Generate glue table name.

Parameters:

- `descriptor(Table)`: Object from (i.e _lakefs_tables/my_table.yaml).
- `descriptor(Table)`: Object from (e.g. _lakefs_tables/my_table.yaml).
- `action_info(Table)`: The global action object.

### `lakefs/catalogexport/delta_exporter`

A package used to export Delta Lake tables from lakeFS to an external cloud storage.

### `lakefs/catalogexport/delta_exporter.export_delta_log(action, table_paths, writer, delta_client)`
### `lakefs/catalogexport/delta_exporter.export_delta_log(action, table_paths, writer, delta_client, table_descriptors_path)`

The function used to export Delta Lake tables.
The return value is a table with mapping of table names to external table location (from which it is possible to query the data).
Expand All @@ -500,6 +500,7 @@ Parameters:
- `table_paths`: Paths list in lakeFS to Delta Tables (e.g. `{"path/to/table1", "path/to/table2"}`)
- `writer`: A writer function with `function(bucket, key, data)` signature, used to write the exported Delta Log (e.g. `aws/s3.s3_client.put_object`)
- `delta_client`: A Delta Lake client that implements `get_table: function(repo, ref, prefix)`
- `table_descriptors_path`: The path under which the table descriptors of the provided `table_paths` reside

Example:

Expand All @@ -517,9 +518,10 @@ hooks:
local formats = require("formats")
local delta_exporter = require("lakefs/catalogexport/delta_exporter")
local table_descriptors_path = "_lakefs_tables"
local sc = aws.s3_client(args.aws.access_key_id, args.aws.secret_access_key, args.aws.region)
local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key, args.aws.region)
local delta_table_locations = delta_exporter.export_delta_log(action, args.table_paths, sc.put_object, delta_client)
local delta_table_locations = delta_exporter.export_delta_log(action, args.table_paths, sc.put_object, delta_client, table_descriptors_path)
for t, loc in pairs(delta_table_locations) do
print("Delta Lake exported table \"" .. t .. "\"'s location: " .. loc .. "\n")
Expand All @@ -533,8 +535,14 @@ hooks:
access_key_id: <LAKEFS_ACCESS_KEY_ID>
secret_access_key: <LAKEFS_SECRET_ACCESS_KEY>
table_paths:
- delta
- delta2
- my/delta/table/path
```

For the table descriptor under the `_lakefs_tables/my/delta/table/path.yaml`:
```yaml
---
name: myTableActualName
type: delta
```

### `path/parse(path_string)`
Expand Down
3 changes: 2 additions & 1 deletion examples/hooks/delta_lake_S3_export.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ local aws = require("aws")
local formats = require("formats")
local delta_export = require("lakefs/catalogexport/delta_exporter")

local table_descriptors_path = "_lakefs_tables"
local sc = aws.s3_client(args.aws.access_key_id, args.aws.secret_access_key, args.aws.region)

local delta_client = formats.delta_client(args.lakefs.access_key_id, args.lakefs.secret_access_key, args.aws.region)
local delta_table_locations = delta_export.export_delta_log(action, args.table_paths, sc.put_object, delta_client)
local delta_table_locations = delta_export.export_delta_log(action, args.table_paths, sc.put_object, delta_client, table_descriptors_path)
for t, loc in pairs(delta_table_locations) do
print("Delta Lake exported table \"" .. t .. "\"'s location: " .. loc .. "\n")
end
15 changes: 10 additions & 5 deletions pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
local lakefs = require("lakefs")
local formats = require("formats")
local pathlib = require("path")
local json = require("encoding/json")
local utils = require("lakefs/catalogexport/internal")
local extractor = require("lakefs/catalogexport/table_extractor")

--[[
delta_log_entry_key_generator returns a closure that returns a Delta Lake version key according to the Delta Lake
Expand Down Expand Up @@ -41,7 +41,7 @@ end
- get_table: function(repo, ref, prefix)
]]
local function export_delta_log(action, table_paths, write_object, delta_client)
local function export_delta_log(action, table_paths, write_object, delta_client, table_descriptors_path)
local repo = action.repository_id
local commit_id = action.commit_id

Expand Down Expand Up @@ -100,9 +100,14 @@ local function export_delta_log(action, table_paths, write_object, delta_client)
end

-- Get the table delta log physical location
local t_name = pathlib.parse(path)["base_name"]
local table_src_path = pathlib.join("/", table_descriptors_path, path .. ".yaml")
local table_descriptor = extractor.get_table_descriptor(lakefs, repo, commit_id, table_src_path)
local table_name = table_descriptor.name
if not table_name then
error("table name is required to proceed with Delta catalog export")
end
local table_export_prefix = utils.get_storage_uri_prefix(ns, commit_id, action)
local table_physical_path = pathlib.join("/", table_export_prefix, t_name)
local table_physical_path = pathlib.join("/", table_export_prefix, table_name)
local table_log_physical_path = pathlib.join("/", table_physical_path, "_delta_log")

-- Upload the log to this physical_address
Expand All @@ -126,7 +131,7 @@ local function export_delta_log(action, table_paths, write_object, delta_client)
local version_key = storage_props.key .. "/" .. entry_version
write_object(storage_props.bucket, version_key, table_entry_string)
end
response[t_name] = table_physical_path
response[path] = table_physical_path
end
return response
end
Expand Down
21 changes: 17 additions & 4 deletions pkg/actions/testdata/lua/catalogexport_delta.lua
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ local function generate_physical_address(path)
return "s3://" .. path
end

package.loaded["lakefs/catalogexport/table_extractor"] = {
get_table_descriptor = function(_, _, _, table_src_path)
local t_name_yaml = pathlib.parse(table_src_path)
assert(t_name_yaml["base_name"] == ".yaml")
local t_name = pathlib.parse(t_name_yaml["parent"])
return {
name = t_name["base_name"]
}
end
}

package.loaded.lakefs = {
stat_object = function(_, _, path)
local parsed_path = pathlib.parse(path)
Expand Down Expand Up @@ -80,9 +91,9 @@ local function assert_physical_address(delta_table_locations, table_paths)

for _, table_path in ipairs(table_paths) do
local table_name = pathlib.parse(table_path)["base_name"]
local table_loc = delta_table_locations[table_name]
local table_loc = delta_table_locations[table_path]
if table_loc == nil then
error("missing table location: " .. table_name)
error("missing table location: " .. table_path)
end
local expected_location = pathlib.join("/", table_export_prefix, table_name)
if expected_location ~= table_loc then
Expand All @@ -106,7 +117,8 @@ local function assert_lakefs_stats(table_paths, content_paths)
end

local function assert_delta_log_content(delta_table_locations, table_to_physical_content)
for table_name, table_loc in pairs(delta_table_locations) do
for table_path, table_loc in pairs(delta_table_locations) do
local table_name = pathlib.parse(table_path)["base_name"]
local table_loc_key = utils.parse_storage_uri(table_loc).key
local content_table = table_to_physical_content[table_name]
if not content_table then
Expand Down Expand Up @@ -169,7 +181,8 @@ local delta_table_locations = delta_export.export_delta_log(
action,
test_table_paths,
mock_object_writer,
mock_delta_client(test_data.table_logs_content)
mock_delta_client(test_data.table_logs_content),
"some_path"
)

-- Test results
Expand Down

0 comments on commit 04466af

Please sign in to comment.