Skip to content

Commit

Permalink
Fix: Delta export path unescape
Browse files Browse the repository at this point in the history
  • Loading branch information
N-o-Z committed Feb 19, 2024
1 parent 2cc79fa commit 108e2b7
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 5 deletions.
108 changes: 106 additions & 2 deletions esti/catalog_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,15 @@ func validateExportTestByStorageType(t *testing.T, ctx context.Context, commit s
namespaceURL, err := url.Parse(resp.JSON200.StorageNamespace)
require.NoError(t, err)
keyTempl := "%s/_lakefs/exported/%s/%s/test_table/_delta_log/00000000000000000000.json"
tableStat, err := client.StatObjectWithResponse(ctx, testData.Repository, mainBranch, &apigen.StatObjectParams{
Path: "tables/test-table/test partition/0-845b8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet",
})
require.NoError(t, err)
require.NotNil(t, tableStat.JSON200)
expectedPath, err := url.Parse(tableStat.JSON200.PhysicalAddress)
require.NoError(t, err)

var reader io.ReadCloser
switch blockstoreType {
case block.BlockstoreTypeS3:
cfg, err := config.LoadDefaultConfig(ctx,
Expand All @@ -434,10 +442,11 @@ func validateExportTestByStorageType(t *testing.T, ctx context.Context, commit s

clt := s3.NewFromConfig(cfg)
key := fmt.Sprintf(keyTempl, strings.TrimPrefix(namespaceURL.Path, "/"), mainBranch, commit[:6])
_, err = clt.HeadObject(ctx, &s3.HeadObjectInput{
readResp, err := clt.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(namespaceURL.Host),
Key: aws.String(key)})
require.NoError(t, err)
reader = readResp.Body

case block.BlockstoreTypeAzure:
azClient, err := azure.BuildAzureServiceClient(params.Azure{
Expand All @@ -448,12 +457,21 @@ func validateExportTestByStorageType(t *testing.T, ctx context.Context, commit s

containerName, prefix, _ := strings.Cut(namespaceURL.Path, uri.PathSeparator)
key := fmt.Sprintf(keyTempl, strings.TrimPrefix(prefix, "/"), mainBranch, commit[:6])
_, err = azClient.NewContainerClient(containerName).NewBlobClient(key).GetProperties(ctx, nil)
readResp, err := azClient.NewContainerClient(containerName).NewBlockBlobClient(key).DownloadStream(ctx, nil)
require.NoError(t, err)
reader = readResp.Body

default:
t.Fatal("validation failed on unsupported block adapter")
}

defer func() {
err := reader.Close()
require.NoError(t, err)
}()
contents, err := io.ReadAll(reader)
require.NoError(t, err)
require.Contains(t, string(contents), expectedPath.String())
}

func TestDeltaCatalogExport(t *testing.T) {
Expand Down Expand Up @@ -509,3 +527,89 @@ func TestDeltaCatalogExport(t *testing.T) {
require.Equal(t, "delta_exporter", tasks.JSON200.Results[0].HookId)
validateExportTestByStorageType(t, ctx, headCommit.Id, testData, blockstore)
}

func TestDeltaCatalogImportExport(t *testing.T) {
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)

requireBlockstoreType(t, block.BlockstoreTypeS3)
accessKeyID := viper.GetString("access_key_id")
secretAccessKey := viper.GetString("secret_access_key")
testData := &exportHooksTestData{
Repository: repo,
Branch: mainBranch,
LakeFSAccessKeyID: accessKeyID,
LakeFSSecretAccessKey: secretAccessKey,
}
blockstore := setupCatalogExportTestByStorageType(t, testData)
tmplDir, err := fs.Sub(exportHooksFiles, "export_hooks_files/delta")
require.NoError(t, err)
err = fs.WalkDir(tmplDir, "data", func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if !d.IsDir() {
buf, err := fs.ReadFile(tmplDir, path)
if err != nil {
return err
}
uploadToPhysicalAddress(t, ctx, repo, mainBranch, strings.TrimPrefix(path, "data/"), string(buf))
}
return nil
})
require.NoError(t, err)

headCommit := uploadAndCommitObjects(t, ctx, repo, mainBranch, map[string]string{
"_lakefs_actions/delta_export.yaml": renderTplFileAsStr(t, testData, tmplDir, fmt.Sprintf("%s/_lakefs_actions/delta_export.yaml", blockstore)),
})

runs := waitForListRepositoryRunsLen(ctx, t, repo, headCommit.Id, 1)
run := runs.Results[0]
require.Equal(t, "completed", run.Status)

amount := apigen.PaginationAmount(1)
tasks, err := client.ListRunHooksWithResponse(ctx, repo, run.RunId, &apigen.ListRunHooksParams{
Amount: &amount,
})
require.NoError(t, err)
require.NotNil(t, tasks.JSON200)
require.Equal(t, 1, len(tasks.JSON200.Results))
require.Equal(t, "delta_exporter", tasks.JSON200.Results[0].HookId)
validateExportTestByStorageType(t, ctx, headCommit.Id, testData, blockstore)
}

func uploadToPhysicalAddress(t *testing.T, ctx context.Context, repo, branch, objPath, objContent string) {
t.Helper()
physicalAddress, err := url.Parse(getStorageNamespace(t, ctx, repo))
require.NoError(t, err)
physicalAddress = physicalAddress.JoinPath("data", objPath)

adapter, err := NewAdapter(physicalAddress.Scheme)
require.NoError(t, err)

stats, err := adapter.Upload(ctx, physicalAddress, strings.NewReader(objContent))
require.NoError(t, err)

mtime := stats.MTime.Unix()
unescapedAddress, err := url.PathUnescape(physicalAddress.String()) // catch: https://github.com/treeverse/lakeFS/issues/7460
require.NoError(t, err)

resp, err := client.StageObjectWithResponse(ctx, repo, branch, &apigen.StageObjectParams{
Path: objPath,
}, apigen.StageObjectJSONRequestBody{
Checksum: stats.ETag,
Mtime: &mtime,
PhysicalAddress: unescapedAddress,
SizeBytes: stats.Size,
})
require.NoError(t, err)
require.NotNil(t, resp.JSON201)
}

func getStorageNamespace(t *testing.T, ctx context.Context, repo string) string {
t.Helper()
resp, err := client.GetRepositoryWithResponse(ctx, repo)
require.NoError(t, err)
require.NotNil(t, resp.JSON200)
return resp.JSON200.StorageNamespace
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"db5e0917-1716-4b0f-a009-c25e5b7304a1","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"registration_dttm\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"first_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"last_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"email\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"gender\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ip_address\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"cc\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"country\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthdate\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"salary\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"title\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"comments\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"__index_level_0__\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1707066829815,"configuration":{}}}
{"add":{"path":"0-845b8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet","partitionValues":{},"size":6434,"modificationTime":1707066829810,"dataChange":true,"stats":"{\"numRecords\": 11, \"minValues\": {\"registration_dttm\": \"2016-02-03T00:35:29\", \"id\": 137, \"first_name\": \"Alan\", \"last_name\": \"Cox\", \"email\": \"[email protected]\", \"gender\": \"Female\", \"ip_address\": \"115.51.190.173\", \"cc\": \"\", \"country\": \"Brazil\", \"birthdate\": \"\", \"salary\": 32792.52, \"title\": \"\", \"comments\": \"\", \"__index_level_0__\": 136}, \"maxValues\": {\"registration_dttm\": \"2016-02-03T23:47:37\", \"id\": 860, \"first_name\": \"Sharon\", \"last_name\": \"Vasquez\", \"email\": \"[email protected]\", \"gender\": \"Male\", \"ip_address\": \"46.161.44.180\", \"cc\": \"630472604816980983\", \"country\": \"Philippines\", \"birthdate\": \"9/27/1984\", \"salary\": 278553.57, \"title\": \"Systems Administrator I\", \"comments\": \"\", \"__index_level_0__\": 859}, \"nullCount\": {\"registration_dttm\": 0, \"id\": 0, \"first_name\": 0, \"last_name\": 0, \"email\": 0, \"gender\": 0, \"ip_address\": 0, \"cc\": 0, \"country\": 0, \"birthdate\": 0, \"salary\": 0, \"title\": 0, \"comments\": 0, \"__index_level_0__\": 0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"add":{"path":"test%20partition/0-845b8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet","partitionValues":{},"size":6434,"modificationTime":1707066829810,"dataChange":true,"stats":"{\"numRecords\": 11, \"minValues\": {\"registration_dttm\": \"2016-02-03T00:35:29\", \"id\": 137, \"first_name\": \"Alan\", \"last_name\": \"Cox\", \"email\": \"[email protected]\", \"gender\": \"Female\", \"ip_address\": \"115.51.190.173\", \"cc\": \"\", \"country\": \"Brazil\", \"birthdate\": \"\", \"salary\": 32792.52, \"title\": \"\", \"comments\": \"\", \"__index_level_0__\": 136}, \"maxValues\": {\"registration_dttm\": \"2016-02-03T23:47:37\", \"id\": 860, \"first_name\": \"Sharon\", \"last_name\": \"Vasquez\", \"email\": \"[email protected]\", \"gender\": \"Male\", \"ip_address\": \"46.161.44.180\", \"cc\": \"630472604816980983\", \"country\": \"Philippines\", \"birthdate\": \"9/27/1984\", \"salary\": 278553.57, \"title\": \"Systems Administrator I\", \"comments\": \"\", \"__index_level_0__\": 859}, \"nullCount\": {\"registration_dttm\": 0, \"id\": 0, \"first_name\": 0, \"last_name\": 0, \"email\": 0, \"gender\": 0, \"ip_address\": 0, \"cc\": 0, \"country\": 0, \"birthdate\": 0, \"salary\": 0, \"title\": 0, \"comments\": 0, \"__index_level_0__\": 0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1707066829820,"operation":"CREATE TABLE","operationParameters":{"protocol":"{\"minReaderVersion\":1,\"minWriterVersion\":2}","metadata":"{\"configuration\":{},\"created_time\":1707066829815,\"description\":null,\"format\":{\"options\":{},\"provider\":\"parquet\"},\"id\":\"db5e0917-1716-4b0f-a009-c25e5b7304a1\",\"name\":null,\"partition_columns\":[],\"schema\":{\"fields\":[{\"metadata\":{},\"name\":\"registration_dttm\",\"nullable\":true,\"type\":\"timestamp\"},{\"metadata\":{},\"name\":\"id\",\"nullable\":true,\"type\":\"integer\"},{\"metadata\":{},\"name\":\"first_name\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"last_name\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"email\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"gender\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"ip_address\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"cc\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"country\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"birthdate\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"salary\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"title\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"comments\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"__index_level_0__\",\"nullable\":true,\"type\":\"long\"}],\"type\":\"struct\"}}","mode":"ErrorIfExists","location":"s3a://delta-lake-demo/main/data"},"clientVersion":"delta-rs.0.17.0"}}
3 changes: 2 additions & 1 deletion pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ local function export_delta_log(action, table_def_names, write_object, delta_cli
local code, obj = lakefs.stat_object(repo, commit_id, unescaped_path)
if code == 200 then
local obj_stat = json.unmarshal(obj)
local physical_path = obj_stat["physical_address"]
local u = url.parse(obj_stat["physical_address"])
local physical_path = url.build_url(u["scheme"], u["host"], u["path"])
if entry.add ~= nil then
entry.add.path = physical_path
elseif entry.remove ~= nil then
Expand Down
2 changes: 1 addition & 1 deletion pkg/actions/lua/lakefs/catalogexport/table_extractor.lua
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ end
local function get_table_descriptor(client, repo_id, commit_id, logical_path)
local code, content = client.get_object(repo_id, commit_id, logical_path)
if code ~= 200 then
error("could not fetch data file: HTTP " .. tostring(code) , "path: ", logical_path)
error("could not fetch data file: HTTP " .. tostring(code) .. " path: " .. logical_path)
end
local descriptor = yaml.unmarshal(content)
descriptor.partition_columns = descriptor.partition_columns or {}
Expand Down
29 changes: 29 additions & 0 deletions pkg/actions/lua/net/url/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ func Open(l *lua.State) {
var library = []lua.RegistryFunction{
{Name: "parse", Function: parse},
{Name: "query_unescape", Function: queryUnescape},
{Name: "path_escape", Function: pathEscape},
{Name: "build_url", Function: build},
}

func parse(l *lua.State) int {
Expand Down Expand Up @@ -47,3 +49,30 @@ func queryUnescape(l *lua.State) int {
l.PushString(qu)
return 1
}

func pathEscape(l *lua.State) int {
path := lua.CheckString(l, 1)
ep := neturl.PathEscape(path)
l.PushString(ep)
return 1
}

func build(l *lua.State) int {
scheme := lua.CheckString(l, 1)
host := lua.CheckString(l, 2)
u := neturl.URL{
Scheme: scheme,
Host: host,
}
if !l.IsNone(3) {
u.Path = lua.CheckString(l, 3)
}
if !l.IsNone(4) {
u.RawQuery = lua.CheckString(l, 4)
}
if !l.IsNone(5) {
u.Fragment = lua.CheckString(l, 3)
}
l.PushString(u.String())
return 1
}

0 comments on commit 108e2b7

Please sign in to comment.