From 108e2b76e62d0d1a304039f209e378a9f8cd27f4 Mon Sep 17 00:00:00 2001 From: Nir Ozery Date: Sat, 17 Feb 2024 20:12:12 +0200 Subject: [PATCH] Fix: Delta export path unescape --- esti/catalog_export_test.go | 108 +++++++++++++++++- .../_delta_log/00000000000000000000.json | 2 +- ...8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet | Bin .../lakefs/catalogexport/delta_exporter.lua | 3 +- .../lakefs/catalogexport/table_extractor.lua | 2 +- pkg/actions/lua/net/url/url.go | 29 +++++ 6 files changed, 139 insertions(+), 5 deletions(-) rename esti/export_hooks_files/delta/data/tables/test-table/{ => test partition}/0-845b8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet (100%) diff --git a/esti/catalog_export_test.go b/esti/catalog_export_test.go index 6cd719f07dd..858593830f1 100644 --- a/esti/catalog_export_test.go +++ b/esti/catalog_export_test.go @@ -423,7 +423,15 @@ func validateExportTestByStorageType(t *testing.T, ctx context.Context, commit s namespaceURL, err := url.Parse(resp.JSON200.StorageNamespace) require.NoError(t, err) keyTempl := "%s/_lakefs/exported/%s/%s/test_table/_delta_log/00000000000000000000.json" + tableStat, err := client.StatObjectWithResponse(ctx, testData.Repository, mainBranch, &apigen.StatObjectParams{ + Path: "tables/test-table/test partition/0-845b8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet", + }) + require.NoError(t, err) + require.NotNil(t, tableStat.JSON200) + expectedPath, err := url.Parse(tableStat.JSON200.PhysicalAddress) + require.NoError(t, err) + var reader io.ReadCloser switch blockstoreType { case block.BlockstoreTypeS3: cfg, err := config.LoadDefaultConfig(ctx, @@ -434,10 +442,11 @@ func validateExportTestByStorageType(t *testing.T, ctx context.Context, commit s clt := s3.NewFromConfig(cfg) key := fmt.Sprintf(keyTempl, strings.TrimPrefix(namespaceURL.Path, "/"), mainBranch, commit[:6]) - _, err = clt.HeadObject(ctx, &s3.HeadObjectInput{ + readResp, err := clt.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(namespaceURL.Host), Key: aws.String(key)}) require.NoError(t, err) + reader = readResp.Body case block.BlockstoreTypeAzure: azClient, err := azure.BuildAzureServiceClient(params.Azure{ @@ -448,12 +457,21 @@ func validateExportTestByStorageType(t *testing.T, ctx context.Context, commit s containerName, prefix, _ := strings.Cut(namespaceURL.Path, uri.PathSeparator) key := fmt.Sprintf(keyTempl, strings.TrimPrefix(prefix, "/"), mainBranch, commit[:6]) - _, err = azClient.NewContainerClient(containerName).NewBlobClient(key).GetProperties(ctx, nil) + readResp, err := azClient.NewContainerClient(containerName).NewBlockBlobClient(key).DownloadStream(ctx, nil) require.NoError(t, err) + reader = readResp.Body default: t.Fatal("validation failed on unsupported block adapter") } + + defer func() { + err := reader.Close() + require.NoError(t, err) + }() + contents, err := io.ReadAll(reader) + require.NoError(t, err) + require.Contains(t, string(contents), expectedPath.String()) } func TestDeltaCatalogExport(t *testing.T) { @@ -509,3 +527,89 @@ func TestDeltaCatalogExport(t *testing.T) { require.Equal(t, "delta_exporter", tasks.JSON200.Results[0].HookId) validateExportTestByStorageType(t, ctx, headCommit.Id, testData, blockstore) } + +func TestDeltaCatalogImportExport(t *testing.T) { + ctx, _, repo := setupTest(t) + defer tearDownTest(repo) + + requireBlockstoreType(t, block.BlockstoreTypeS3) + accessKeyID := viper.GetString("access_key_id") + secretAccessKey := viper.GetString("secret_access_key") + testData := &exportHooksTestData{ + Repository: repo, + Branch: mainBranch, + LakeFSAccessKeyID: accessKeyID, + LakeFSSecretAccessKey: secretAccessKey, + } + blockstore := setupCatalogExportTestByStorageType(t, testData) + tmplDir, err := fs.Sub(exportHooksFiles, "export_hooks_files/delta") + require.NoError(t, err) + err = fs.WalkDir(tmplDir, "data", func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if !d.IsDir() { + buf, err := fs.ReadFile(tmplDir, path) + if err != nil { + return err + } + uploadToPhysicalAddress(t, ctx, repo, mainBranch, strings.TrimPrefix(path, "data/"), string(buf)) + } + return nil + }) + require.NoError(t, err) + + headCommit := uploadAndCommitObjects(t, ctx, repo, mainBranch, map[string]string{ + "_lakefs_actions/delta_export.yaml": renderTplFileAsStr(t, testData, tmplDir, fmt.Sprintf("%s/_lakefs_actions/delta_export.yaml", blockstore)), + }) + + runs := waitForListRepositoryRunsLen(ctx, t, repo, headCommit.Id, 1) + run := runs.Results[0] + require.Equal(t, "completed", run.Status) + + amount := apigen.PaginationAmount(1) + tasks, err := client.ListRunHooksWithResponse(ctx, repo, run.RunId, &apigen.ListRunHooksParams{ + Amount: &amount, + }) + require.NoError(t, err) + require.NotNil(t, tasks.JSON200) + require.Equal(t, 1, len(tasks.JSON200.Results)) + require.Equal(t, "delta_exporter", tasks.JSON200.Results[0].HookId) + validateExportTestByStorageType(t, ctx, headCommit.Id, testData, blockstore) +} + +func uploadToPhysicalAddress(t *testing.T, ctx context.Context, repo, branch, objPath, objContent string) { + t.Helper() + physicalAddress, err := url.Parse(getStorageNamespace(t, ctx, repo)) + require.NoError(t, err) + physicalAddress = physicalAddress.JoinPath("data", objPath) + + adapter, err := NewAdapter(physicalAddress.Scheme) + require.NoError(t, err) + + stats, err := adapter.Upload(ctx, physicalAddress, strings.NewReader(objContent)) + require.NoError(t, err) + + mtime := stats.MTime.Unix() + unescapedAddress, err := url.PathUnescape(physicalAddress.String()) // catch: https://github.com/treeverse/lakeFS/issues/7460 + require.NoError(t, err) + + resp, err := client.StageObjectWithResponse(ctx, repo, branch, &apigen.StageObjectParams{ + Path: objPath, + }, apigen.StageObjectJSONRequestBody{ + Checksum: stats.ETag, + Mtime: &mtime, + PhysicalAddress: unescapedAddress, + SizeBytes: stats.Size, + }) + require.NoError(t, err) + require.NotNil(t, resp.JSON201) +} + +func getStorageNamespace(t *testing.T, ctx context.Context, repo string) string { + t.Helper() + resp, err := client.GetRepositoryWithResponse(ctx, repo) + require.NoError(t, err) + require.NotNil(t, resp.JSON200) + return resp.JSON200.StorageNamespace +} diff --git a/esti/export_hooks_files/delta/data/tables/test-table/_delta_log/00000000000000000000.json b/esti/export_hooks_files/delta/data/tables/test-table/_delta_log/00000000000000000000.json index cabe7fc0d3f..8c26d78b73c 100644 --- a/esti/export_hooks_files/delta/data/tables/test-table/_delta_log/00000000000000000000.json +++ b/esti/export_hooks_files/delta/data/tables/test-table/_delta_log/00000000000000000000.json @@ -1,4 +1,4 @@ {"protocol":{"minReaderVersion":1,"minWriterVersion":2}} {"metaData":{"id":"db5e0917-1716-4b0f-a009-c25e5b7304a1","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"registration_dttm\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"first_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"last_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"email\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"gender\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ip_address\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"cc\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"country\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthdate\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"salary\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"title\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"comments\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"__index_level_0__\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1707066829815,"configuration":{}}} -{"add":{"path":"0-845b8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet","partitionValues":{},"size":6434,"modificationTime":1707066829810,"dataChange":true,"stats":"{\"numRecords\": 11, \"minValues\": {\"registration_dttm\": \"2016-02-03T00:35:29\", \"id\": 137, \"first_name\": \"Alan\", \"last_name\": \"Cox\", \"email\": \"amartinezkk@wunderground.com\", \"gender\": \"Female\", \"ip_address\": \"115.51.190.173\", \"cc\": \"\", \"country\": \"Brazil\", \"birthdate\": \"\", \"salary\": 32792.52, \"title\": \"\", \"comments\": \"\", \"__index_level_0__\": 136}, \"maxValues\": {\"registration_dttm\": \"2016-02-03T23:47:37\", \"id\": 860, \"first_name\": \"Sharon\", \"last_name\": \"Vasquez\", \"email\": \"sgriffinhe@myspace.com\", \"gender\": \"Male\", \"ip_address\": \"46.161.44.180\", \"cc\": \"630472604816980983\", \"country\": \"Philippines\", \"birthdate\": \"9/27/1984\", \"salary\": 278553.57, \"title\": \"Systems Administrator I\", \"comments\": \"\", \"__index_level_0__\": 859}, \"nullCount\": {\"registration_dttm\": 0, \"id\": 0, \"first_name\": 0, \"last_name\": 0, \"email\": 0, \"gender\": 0, \"ip_address\": 0, \"cc\": 0, \"country\": 0, \"birthdate\": 0, \"salary\": 0, \"title\": 0, \"comments\": 0, \"__index_level_0__\": 0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"add":{"path":"test%20partition/0-845b8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet","partitionValues":{},"size":6434,"modificationTime":1707066829810,"dataChange":true,"stats":"{\"numRecords\": 11, \"minValues\": {\"registration_dttm\": \"2016-02-03T00:35:29\", \"id\": 137, \"first_name\": \"Alan\", \"last_name\": \"Cox\", \"email\": \"amartinezkk@wunderground.com\", \"gender\": \"Female\", \"ip_address\": \"115.51.190.173\", \"cc\": \"\", \"country\": \"Brazil\", \"birthdate\": \"\", \"salary\": 32792.52, \"title\": \"\", \"comments\": \"\", \"__index_level_0__\": 136}, \"maxValues\": {\"registration_dttm\": \"2016-02-03T23:47:37\", \"id\": 860, \"first_name\": \"Sharon\", \"last_name\": \"Vasquez\", \"email\": \"sgriffinhe@myspace.com\", \"gender\": \"Male\", \"ip_address\": \"46.161.44.180\", \"cc\": \"630472604816980983\", \"country\": \"Philippines\", \"birthdate\": \"9/27/1984\", \"salary\": 278553.57, \"title\": \"Systems Administrator I\", \"comments\": \"\", \"__index_level_0__\": 859}, \"nullCount\": {\"registration_dttm\": 0, \"id\": 0, \"first_name\": 0, \"last_name\": 0, \"email\": 0, \"gender\": 0, \"ip_address\": 0, \"cc\": 0, \"country\": 0, \"birthdate\": 0, \"salary\": 0, \"title\": 0, \"comments\": 0, \"__index_level_0__\": 0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} {"commitInfo":{"timestamp":1707066829820,"operation":"CREATE TABLE","operationParameters":{"protocol":"{\"minReaderVersion\":1,\"minWriterVersion\":2}","metadata":"{\"configuration\":{},\"created_time\":1707066829815,\"description\":null,\"format\":{\"options\":{},\"provider\":\"parquet\"},\"id\":\"db5e0917-1716-4b0f-a009-c25e5b7304a1\",\"name\":null,\"partition_columns\":[],\"schema\":{\"fields\":[{\"metadata\":{},\"name\":\"registration_dttm\",\"nullable\":true,\"type\":\"timestamp\"},{\"metadata\":{},\"name\":\"id\",\"nullable\":true,\"type\":\"integer\"},{\"metadata\":{},\"name\":\"first_name\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"last_name\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"email\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"gender\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"ip_address\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"cc\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"country\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"birthdate\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"salary\",\"nullable\":true,\"type\":\"double\"},{\"metadata\":{},\"name\":\"title\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"comments\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"__index_level_0__\",\"nullable\":true,\"type\":\"long\"}],\"type\":\"struct\"}}","mode":"ErrorIfExists","location":"s3a://delta-lake-demo/main/data"},"clientVersion":"delta-rs.0.17.0"}} \ No newline at end of file diff --git a/esti/export_hooks_files/delta/data/tables/test-table/0-845b8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet b/esti/export_hooks_files/delta/data/tables/test-table/test partition/0-845b8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet similarity index 100% rename from esti/export_hooks_files/delta/data/tables/test-table/0-845b8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet rename to esti/export_hooks_files/delta/data/tables/test-table/test partition/0-845b8a42-579e-47ee-9935-921dd8d2ba7d-0.parquet diff --git a/pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua b/pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua index 44e6151664e..a1f7d19e1c8 100644 --- a/pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua +++ b/pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua @@ -111,7 +111,8 @@ local function export_delta_log(action, table_def_names, write_object, delta_cli local code, obj = lakefs.stat_object(repo, commit_id, unescaped_path) if code == 200 then local obj_stat = json.unmarshal(obj) - local physical_path = obj_stat["physical_address"] + local u = url.parse(obj_stat["physical_address"]) + local physical_path = url.build_url(u["scheme"], u["host"], u["path"]) if entry.add ~= nil then entry.add.path = physical_path elseif entry.remove ~= nil then diff --git a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua index 95e721151e0..0b39bf1d3e2 100644 --- a/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua +++ b/pkg/actions/lua/lakefs/catalogexport/table_extractor.lua @@ -40,7 +40,7 @@ end local function get_table_descriptor(client, repo_id, commit_id, logical_path) local code, content = client.get_object(repo_id, commit_id, logical_path) if code ~= 200 then - error("could not fetch data file: HTTP " .. tostring(code) , "path: ", logical_path) + error("could not fetch data file: HTTP " .. tostring(code) .. " path: " .. logical_path) end local descriptor = yaml.unmarshal(content) descriptor.partition_columns = descriptor.partition_columns or {} diff --git a/pkg/actions/lua/net/url/url.go b/pkg/actions/lua/net/url/url.go index c8421ca5b86..c51c30ac026 100644 --- a/pkg/actions/lua/net/url/url.go +++ b/pkg/actions/lua/net/url/url.go @@ -19,6 +19,8 @@ func Open(l *lua.State) { var library = []lua.RegistryFunction{ {Name: "parse", Function: parse}, {Name: "query_unescape", Function: queryUnescape}, + {Name: "path_escape", Function: pathEscape}, + {Name: "build_url", Function: build}, } func parse(l *lua.State) int { @@ -47,3 +49,30 @@ func queryUnescape(l *lua.State) int { l.PushString(qu) return 1 } + +func pathEscape(l *lua.State) int { + path := lua.CheckString(l, 1) + ep := neturl.PathEscape(path) + l.PushString(ep) + return 1 +} + +func build(l *lua.State) int { + scheme := lua.CheckString(l, 1) + host := lua.CheckString(l, 2) + u := neturl.URL{ + Scheme: scheme, + Host: host, + } + if !l.IsNone(3) { + u.Path = lua.CheckString(l, 3) + } + if !l.IsNone(4) { + u.RawQuery = lua.CheckString(l, 4) + } + if !l.IsNone(5) { + u.Fragment = lua.CheckString(l, 3) + } + l.PushString(u.String()) + return 1 +}