-
Notifications
You must be signed in to change notification settings - Fork 365
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix: lakectl local diff slowness #7842
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about this code. We should find a way to test this very precisely.
return nil | ||
} | ||
if info.IsDir() { | ||
// TODO: We don't return dir results for the listing, how will this effect directory markers, and can we even support directory markers? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a potentially severe product limitation in lakectl local
:
- Run Spark directly on lakeFS.
- Get that branch into
lakectl local
. - Commit the branch "unchanged".
Won't we see (many) changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is not a new behavior. We didn't support directory markers also before. It came up while I was writing the fix and I thought it was worth mentioning
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we open an issue for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pkg/local/diff.go
Outdated
} | ||
|
||
// Sort the queue since adding the suffix might the order might change after adding the suffix | ||
sort.Strings(dirQueue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand: we sort the queue on every object?! Given that sorting requires at least seeing every object on the queue, this will be slow!
Example
Consider this directory structure:
a0000/a
a0001/a
a0002/a
[...]
a9999/a
z0000
z0001
z0002
[...]
z9999
It contains 10000 directories and 20000 objects in total. I would expect the code to try to sort all 10000 a
directories for each of the 10000 z
files. Given that sorting is
This sort of thing can and will happen for many cases. For instance consider a lakeFS repository that contains two tables, each partitioned into 10000 parts. 100M operations guaranteed.
Possible fix
Use at least a priority queue to handle these.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are sorting all directories that precede a given file (if exists) when walking.
This is necessary since we need to process these directories prior to processing the file itself.
Nevertheless, I have moved the sort to inside the if condition and improved the condition itself
pkg/local/diff.go
Outdated
if queueLen > 0 && p > dirQueue[len(dirQueue)-1] { | ||
// if file > dirs, handle the dirs first | ||
for _, dir := range dirQueue { | ||
if err = WalkS3(dir, callbackFunc); err != nil { | ||
return err | ||
} | ||
} | ||
dirQueue = []string{} // Empty queue once finished processing dirs | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think that this can be correct. Consider the case
dirQueue[0] < p && p < dirQueue[len(dirQueue)-1]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is impossible:
for simplicity lets call dirQueue[len(dirQueue)-1] last
and dirQueue[0] first
if p < last
==> p < last - "/"
(because otherwise the walk function would have returned p before last
if first
< p ==> first - "/"
< p
Therefore it mandates that first - "/"
< p < last - "/"
which means that we should have gotten p from the walk function somewhere between getting first
and last
and we get a contradiction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope the explanation is convincing 😃
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm retracting my previous statement, I have not taken into account different string length. I have added the code that deals with this scenario as well as a unit test
Co-authored-by: Ariel Shaqed (Scolnicov) <[email protected]>
Thanks for the review. |
Looks like it found somethine almost immediately :-/ AFAIU tests will run _existing_ failures in testdata/fuzz. You can fuzz more by running something like: ```sh $ go test -v ./pkg/local/ -fuzz FuzzWalkS3 --fuzztime 5m === RUN TestDiffLocal === RUN TestDiffLocal/t1_no_diff === RUN TestDiffLocal/t1_modified === RUN TestDiffLocal/t1_local_before === RUN TestDiffLocal/t1_local_after === RUN TestDiffLocal/t1_hidden_changed --- PASS: TestDiffLocal (0.00s) --- PASS: TestDiffLocal/t1_no_diff (0.00s) --- PASS: TestDiffLocal/t1_modified (0.00s) --- PASS: TestDiffLocal/t1_local_before (0.00s) --- PASS: TestDiffLocal/t1_local_after (0.00s) --- PASS: TestDiffLocal/t1_hidden_changed (0.00s) === RUN TestWalkS3 === RUN TestWalkS3/reverse_order === RUN TestWalkS3/file_in_the_middle === RUN TestWalkS3/dirs_at_the_end === RUN TestWalkS3/files_mixed_with_directories --- PASS: TestWalkS3 (0.00s) --- PASS: TestWalkS3/reverse_order (0.00s) --- PASS: TestWalkS3/file_in_the_middle (0.00s) --- PASS: TestWalkS3/dirs_at_the_end (0.00s) --- PASS: TestWalkS3/files_mixed_with_directories (0.00s) === RUN TestWriteIndex --- PASS: TestWriteIndex (0.00s) === RUN TestReadIndex --- PASS: TestReadIndex (0.00s) === RUN TestFindIndices --- PASS: TestFindIndices (0.00s) === RUN FuzzWalkS3 fuzz: elapsed: 0s, gathering baseline coverage: 0/47 completed failure while testing seed corpus entry: FuzzWalkS3/0345f16af6907ab1 fuzz: elapsed: 0s, gathering baseline coverage: 0/47 completed --- FAIL: FuzzWalkS3 (0.03s) --- FAIL: FuzzWalkS3 (0.00s) diff_test.go:323: Error Trace: /home/ariels/dev/lakeFS/pkg/local/diff_test.go:323 /home/ariels/sdk/go1.21.3/src/reflect/value.go:596 /home/ariels/sdk/go1.21.3/src/reflect/value.go:380 /home/ariels/sdk/go1.21.3/src/testing/fuzz.go:335 Error: Not equal: expected: []string{"imported 0", "imported/0"} actual : []string{"imported/0", "imported 0"} Diff: --- Expected +++ Actual @@ -1,4 +1,4 @@ ([]string) (len=2) { - (string) (len=10) "imported 0", - (string) (len=10) "imported/0" + (string) (len=10) "imported/0", + (string) (len=10) "imported 0" } Test: FuzzWalkS3 === NAME FAIL exit status 1 FAIL github.com/treeverse/lakefs/pkg/local 0.038s ``` Note that Go only finds a _first_ bug - but at least it tries to minimize it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe further testing is required. So I pushed some fuzzing code; I think it found a bug. I guess we need to fix either the fuzzer or the code. But I would like us to have some fuzzing here.
@arielshaqed Thanks for the test! I'm reverting the previous change as this is the root cause of the bug! |
@arielshaqed added a priority queue and more unit tests |
…kectl-local-diff-slowness-7643
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, neat!
Let's also fuzz for ~30 minutes.
Die("failed to stat source", 1) | ||
} | ||
|
||
if !recursive || !stat.IsDir() { // Ignore recursive if source is a file and not a directory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the code, but I didn't go through all the nooks and crannies. Suppose !recursive && stat.IsDir()
. Then we don't perform this single-object upload, and instead go to l. 53 which says "try recursive upload". Do we need to fix the comment, or will this really recursively upload directories just because they're directories?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If !recursive && stat.IsDir()
we will get the error:
read <path>: is a directory
We can explicitly check this matrix and fail if this makes it more clear.
require.Contains(t, sanitizedResult, "Downloaded: 0") | ||
require.Contains(t, sanitizedResult, "Uploaded: 1") | ||
require.Contains(t, sanitizedResult, "Removed: 0") | ||
RunCmdAndVerifySuccessWithFile(t, Lakectl()+" fs upload --recursive -s files/ro_1k lakefs://"+repoName+"/"+mainBranch+"/"+vars["FILE_PATH"]+" -s files/ro_1k", false, "lakectl_fs_upload", vars) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However we answer my above misunderstanding, I'd appreciate a test that a non-recursive upload of a directory does... whatever it should.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does - it fails :)
Adding a test to cover this scenario 👍🏽 (we have a similar scenario that expects failure on the server side)
return nil | ||
} | ||
if info.IsDir() { | ||
// TODO: We don't return dir results for the listing, how will this effect directory markers, and can we even support directory markers? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we open an issue for this?
pkg/local/diff.go
Outdated
} | ||
if info.IsDir() { | ||
// TODO: We don't return dir results for the listing, how will this effect directory markers, and can we even support directory markers? | ||
// Save encountered directories in a sorted queue and compare them with the first appearance of an file in that level |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Save encountered directories in a sorted queue and compare them with the first appearance of an file in that level | |
// Save encountered directories in a priority queue and compare them with the first appearance of an file in that level |
pkg/local/diff.go
Outdated
return filepath.SkipDir | ||
} | ||
|
||
heap.Init(&dirQueue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this call? This is a really inefficient call, it takes time linear in the size of heap
. When using a heap as a priority queue it is not required: the heap invariants are always kept.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, it's not needed
pkg/local/diff.go
Outdated
dir := heap.Pop(&dirQueue).(string) | ||
if p > dir { | ||
if err = WalkS3(dir, callbackFunc); err != nil { | ||
return err | ||
} | ||
} else { // Put it back in its place | ||
heap.Push(&dirQueue, dir) | ||
break | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This adds a
pkg/local/diff.go
Outdated
} | ||
|
||
heap.Init(&dirQueue) | ||
for dirQueue.Len() > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can and should run this loop even for a directory, so before l. 203; you'd only have to compare the "normalized" name, which is p
for a file and p+"/"
for a directory. That would keep the heap smaller, which is always helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand the comment. That's exactly what this code is doing
pkg/local/sorted_queue.go
Outdated
package local | ||
|
||
// A SortedQueue implements heap.Interface and holds strings. | ||
type SortedQueue []string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So why do all the methods use any
and not string
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Required by the heap interface
pkg/local/sorted_queue.go
Outdated
return (*pq)[i] < (*pq)[j] | ||
} | ||
|
||
func (pq *SortedQueue) Swap(i, j int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, you could add
func (pq *SortedQueue) Swap(i, j int) { | |
func (pq *SortedQueue) Top() string { | |
return pq[0] | |
} | |
func (pq *SortedQueue) Swap(i, j int) { |
package heap explicitly says that "The minimum element in the tree is the root, at index 0. "
pkg/local/sorted_queue.go
Outdated
package local | ||
|
||
// A SortedQueue implements heap.Interface and holds strings. | ||
type SortedQueue []string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a PriorityQueue :-/ Let's rename after everything else is done, otherwise GitHub becomes a mess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified the type name, I'll modify the file name when we're finished
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Not approving because of some minor issues with recursive flag behaviour and efficiency.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Looks really useful and fast.
Before committing, please do clean up the string-heap interface: the word "string" appears in the type name, so the word "any" should not appear in any type signature.
Also a comment on early-exits, but that one is your call.
pkg/local/diff.go
Outdated
if p > dir { | ||
heap.Pop(&stringHeap) // remove from queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use stringHeap.Pop
here (and below)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't because it is the heap's Pop and Push logic that actually re-organizes the tree as part of the operation. the stringHeap object itself is rather "dumb".
We have to maintain the interface which returns any
, otherwise we could not use the heap functionality
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pkg/local/sorted_queue.go
Outdated
// A SortedQueue implements heap.Interface and holds strings. | ||
type SortedQueue []string | ||
// A StringHeap is a min-heap of strings | ||
type StringHeap []string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why any method of this object ever takes an any
argument: it holds only string
s, which is fine. Why do we want to cast?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to maintain the interface which returns any
, otherwise we could not use the heap functionality
See: https://pkg.go.dev/container/heap#Interface
pkg/local/diff.go
Outdated
} else { | ||
break | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As always, I would prefer to reverse test, write line 212 as
if p <= dir {
break
}
and lose a level of indentation.
(This is of course not required)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
Closes #7643
Change Description
Background
lakectl local had a bug due to the different way standard FS lists files recursively in comparison with object stores (S3).
A fix was introduced which used the local adapter file walker to be used also for lakectl local, in order to diff between local and remote.
Due to the mentioned change, the local diff performance was greatly impacted.
Bug Fix
Improve local diff performance by creating a new walk functions that simulates the order in which object stores list files.
Relying on the FS walk implementation and modifying the behavior when iterating over directories
Testing Details
Manually testing the scenario of ~500K file diff. Operation time was reduced from 18s to 3s
Existing tests verify correctness was not broken
Breaking Change?
No
Additional info
before:
after: