From 6549c9ad127f801fb3e9a1bf869ad7868eb5dc23 Mon Sep 17 00:00:00 2001 From: Ivan Giuliani Date: Tue, 27 Oct 2020 11:49:38 +0000 Subject: [PATCH 1/3] Limit the resultset size of list In practice, this is already happening with GCS where result sets for `list` are implicitly capped to 1000 items. This commit allows that value to be changed and also adds the same behaviour to other adapters. --- lib/file_storage/disk.rb | 5 +++-- lib/file_storage/gcs.rb | 7 +++++-- lib/file_storage/in_memory.rb | 7 +++++-- lib/file_storage/key_storage.rb | 8 +++++++- spec/file_storage/disk_spec.rb | 13 ++++++++++--- spec/file_storage/in_memory_spec.rb | 17 +++++++++++++---- 6 files changed, 43 insertions(+), 14 deletions(-) diff --git a/lib/file_storage/disk.rb b/lib/file_storage/disk.rb index f976914..e82e4c5 100644 --- a/lib/file_storage/disk.rb +++ b/lib/file_storage/disk.rb @@ -33,13 +33,14 @@ def download(bucket:, key:) end end - def list(bucket:, key:) + def list(bucket:, key:, page_size:) root = Pathname.new(bucket_root(bucket)) matching_keys = Dir["#{root}/**/*"]. reject { |absolute_path| File.directory?(absolute_path) }. map { |full_path| Pathname.new(full_path).relative_path_from(root).to_s }. - select { |f| f.start_with?(key) } + select { |f| f.start_with?(key) }. + first(page_size) { bucket: bucket, diff --git a/lib/file_storage/gcs.rb b/lib/file_storage/gcs.rb index 3dcdf8f..c2cc949 100644 --- a/lib/file_storage/gcs.rb +++ b/lib/file_storage/gcs.rb @@ -42,8 +42,11 @@ def download(bucket:, key:) } end - def list(bucket:, key:) - matching_keys = get_bucket(bucket).files(prefix: key).map(&:name) + def list(bucket:, key:, page_size:) + matching_keys = get_bucket(bucket). + files(prefix: key, max: page_size). + map(&:name) + { bucket: bucket, keys: matching_keys, diff --git a/lib/file_storage/in_memory.rb b/lib/file_storage/in_memory.rb index 47fb9fe..4c0b788 100644 --- a/lib/file_storage/in_memory.rb +++ b/lib/file_storage/in_memory.rb @@ -41,8 +41,11 @@ def download(bucket:, key:) } end - def list(bucket:, key:) - matching_keys = @buckets[bucket].keys.select { |k| k.start_with?(key) } + def list(bucket:, key:, page_size:) + matching_keys = @buckets[bucket].keys. + select { |k| k.start_with?(key) }. + first(page_size) + { bucket: bucket, keys: matching_keys, diff --git a/lib/file_storage/key_storage.rb b/lib/file_storage/key_storage.rb index 8666ad4..89339b3 100644 --- a/lib/file_storage/key_storage.rb +++ b/lib/file_storage/key_storage.rb @@ -84,17 +84,23 @@ def upload!(content) # This will return a list of valid keys in the format of `adapter://bucket/key`. The keys in # the list will share the reference key as a prefix. # + # @param [Integer] page_size + # the number of items to be returned in the call. Note that if the `page_size` is + # smaller than the available keys for the given URI, there's no guarantee on the + # ordering upon which the keys will be returned and it's possible for two calls on + # the same URI and same `page_size` to return different result sets. # @return [Array] A list of keys in the format of `adapter://bucket/key` # # @example List all files under a given prefix # FileStorage.for("inmemory://bucket/prefix").list - def list + def list(page_size: 1000) FileStorage.logger.info(event: "key_storage.list_started") start = FileStorage::Timing.monotonic_now result = adapter.list( bucket: bucket, key: key, + page_size: page_size, ) FileStorage.logger.info(resource_count: result[:keys].count, diff --git a/spec/file_storage/disk_spec.rb b/spec/file_storage/disk_spec.rb index e3663e0..e470047 100644 --- a/spec/file_storage/disk_spec.rb +++ b/spec/file_storage/disk_spec.rb @@ -95,7 +95,7 @@ describe "#list" do context "when the bucket is empty" do it "returns an empty list" do - expect(instance.list(bucket: bucket, key: "whatever")).to eq( + expect(instance.list(bucket: bucket, key: "whatever", page_size: 1000)).to eq( bucket: bucket, keys: [], ) @@ -113,7 +113,7 @@ context "and we provide a matching prefix" do it "returns only the matching items" do - expect(instance.list(bucket: bucket, key: "2019-01")).to match( + expect(instance.list(bucket: bucket, key: "2019-01", page_size: 1000)).to match( bucket: bucket, keys: match_array(%w[2019-01/hello1 2019-01/hello2 2019-01/hello3]), ) @@ -122,12 +122,19 @@ context "when the prefix doesn't match anything" do it "returns an empty list" do - expect(instance.list(bucket: bucket, key: "YOLO")).to match( + expect(instance.list(bucket: bucket, key: "YOLO", page_size: 1000)).to match( bucket: bucket, keys: [], ) end end + + it "returns a subset of the matching keys" do + expect(instance.list(bucket: bucket, key: "2019-01", page_size: 2)).to match( + bucket: bucket, + keys: have_attributes(length: 2), + ) + end end end diff --git a/spec/file_storage/in_memory_spec.rb b/spec/file_storage/in_memory_spec.rb index aa5feba..ead0013 100644 --- a/spec/file_storage/in_memory_spec.rb +++ b/spec/file_storage/in_memory_spec.rb @@ -56,7 +56,7 @@ context "but we try to fetch it from a different bucket" do it "raises an error" do - expect(instance.list(bucket: bucket, key: "whatever")).to eq( + expect(instance.list(bucket: bucket, key: "whatever", page_size: 1000)).to eq( bucket: bucket, keys: [], ) @@ -68,7 +68,7 @@ describe "#list" do context "when the bucket is empty" do it "returns an empty list" do - expect(instance.list(bucket: bucket, key: "whatever")).to eq( + expect(instance.list(bucket: bucket, key: "whatever", page_size: 1000)).to eq( bucket: bucket, keys: [], ) @@ -86,7 +86,7 @@ context "and we provide a matching prefix" do it "returns only the matching items" do - expect(instance.list(bucket: bucket, key: "2019-01")).to match( + expect(instance.list(bucket: bucket, key: "2019-01", page_size: 1000)).to match( bucket: bucket, keys: match_array(%w[2019-01/hello1 2019-01/hello2 2019-01/hello3]), ) @@ -95,12 +95,21 @@ context "when the prefix doesn't match anything" do it "returns an empty list" do - expect(instance.list(bucket: bucket, key: "YOLO")).to match( + expect(instance.list(bucket: bucket, key: "YOLO", page_size: 1000)).to match( bucket: bucket, keys: [], ) end end + + context "and we request fewer keys than they are available" do + it "returns a subset of the matching keys" do + expect(instance.list(bucket: bucket, key: "2019-01", page_size: 2)).to match( + bucket: bucket, + keys: have_attributes(length: 2), + ) + end + end end end From e23e545fa92e213170f2dfd50ef841252da591ef Mon Sep 17 00:00:00 2001 From: Ivan Giuliani Date: Tue, 27 Oct 2020 14:48:53 +0000 Subject: [PATCH 2/3] Have all adapters support pagination of results This is still not exposed to the outside callers, but all the adapters from now on must be able to paginate through a list of results. This is done by always returning an enumerable back to the main class. --- lib/file_storage/disk.rb | 15 +++++++------ lib/file_storage/gcs.rb | 21 +++++++++++------ lib/file_storage/in_memory.rb | 15 +++++++------ lib/file_storage/key_storage.rb | 4 ++-- spec/file_storage/disk_spec.rb | 28 ++++++++++++++--------- spec/file_storage/in_memory_spec.rb | 35 ++++++++++++++++------------- 6 files changed, 70 insertions(+), 48 deletions(-) diff --git a/lib/file_storage/disk.rb b/lib/file_storage/disk.rb index e82e4c5..16543f4 100644 --- a/lib/file_storage/disk.rb +++ b/lib/file_storage/disk.rb @@ -36,16 +36,17 @@ def download(bucket:, key:) def list(bucket:, key:, page_size:) root = Pathname.new(bucket_root(bucket)) - matching_keys = Dir["#{root}/**/*"]. + Dir["#{root}/**/*"]. reject { |absolute_path| File.directory?(absolute_path) }. map { |full_path| Pathname.new(full_path).relative_path_from(root).to_s }. select { |f| f.start_with?(key) }. - first(page_size) - - { - bucket: bucket, - keys: matching_keys, - } + each_slice(page_size). + map do |keys| + { + bucket: bucket, + keys: keys, + } + end.to_enum end def delete!(bucket:, key:) diff --git a/lib/file_storage/gcs.rb b/lib/file_storage/gcs.rb index c2cc949..4d8c969 100644 --- a/lib/file_storage/gcs.rb +++ b/lib/file_storage/gcs.rb @@ -43,14 +43,21 @@ def download(bucket:, key:) end def list(bucket:, key:, page_size:) - matching_keys = get_bucket(bucket). - files(prefix: key, max: page_size). - map(&:name) + Enumerator.new do |yielder| + token = nil - { - bucket: bucket, - keys: matching_keys, - } + loop do + page = get_bucket(bucket).files(prefix: key, max: page_size, token: token) + yielder.yield({ + bucket: bucket, + keys: page.map(&:name), + }) + + break if page.token.nil? + + token = page.token + end + end end def delete!(bucket:, key:) diff --git a/lib/file_storage/in_memory.rb b/lib/file_storage/in_memory.rb index 4c0b788..58759a2 100644 --- a/lib/file_storage/in_memory.rb +++ b/lib/file_storage/in_memory.rb @@ -42,14 +42,15 @@ def download(bucket:, key:) end def list(bucket:, key:, page_size:) - matching_keys = @buckets[bucket].keys. + @buckets[bucket].keys. select { |k| k.start_with?(key) }. - first(page_size) - - { - bucket: bucket, - keys: matching_keys, - } + each_slice(page_size). + map do |keys| + { + bucket: bucket, + keys: keys, + } + end.to_enum end def delete!(bucket:, key:) diff --git a/lib/file_storage/key_storage.rb b/lib/file_storage/key_storage.rb index 89339b3..3a30620 100644 --- a/lib/file_storage/key_storage.rb +++ b/lib/file_storage/key_storage.rb @@ -101,13 +101,13 @@ def list(page_size: 1000) bucket: bucket, key: key, page_size: page_size, - ) + ).first FileStorage.logger.info(resource_count: result[:keys].count, event: "key_storage.list_finished", duration: FileStorage::Timing.monotonic_now - start) - result[:keys].map { |key| "#{adapter_type}://#{result[:bucket]}/#{key}" } + result.fetch(:keys, []).map { |key| "#{adapter_type}://#{result[:bucket]}/#{key}" } end # Deletes the referenced key. diff --git a/spec/file_storage/disk_spec.rb b/spec/file_storage/disk_spec.rb index e470047..c9b2ab3 100644 --- a/spec/file_storage/disk_spec.rb +++ b/spec/file_storage/disk_spec.rb @@ -95,10 +95,7 @@ describe "#list" do context "when the bucket is empty" do it "returns an empty list" do - expect(instance.list(bucket: bucket, key: "whatever", page_size: 1000)).to eq( - bucket: bucket, - keys: [], - ) + expect(instance.list(bucket: bucket, key: "whatever", page_size: 1000).to_a).to eq([]) end end @@ -113,7 +110,7 @@ context "and we provide a matching prefix" do it "returns only the matching items" do - expect(instance.list(bucket: bucket, key: "2019-01", page_size: 1000)).to match( + expect(instance.list(bucket: bucket, key: "2019-01", page_size: 1000).first).to match( bucket: bucket, keys: match_array(%w[2019-01/hello1 2019-01/hello2 2019-01/hello3]), ) @@ -122,19 +119,30 @@ context "when the prefix doesn't match anything" do it "returns an empty list" do - expect(instance.list(bucket: bucket, key: "YOLO", page_size: 1000)).to match( - bucket: bucket, - keys: [], - ) + expect(instance.list(bucket: bucket, key: "YOLO", page_size: 1000).to_a).to eq([]) end end it "returns a subset of the matching keys" do - expect(instance.list(bucket: bucket, key: "2019-01", page_size: 2)).to match( + expect(instance.list(bucket: bucket, key: "2019-01", page_size: 2).first).to match( bucket: bucket, keys: have_attributes(length: 2), ) end + + context "when there are multiple pages of results available" do + it "returns an enumerable" do + expect(instance.list(bucket: bucket, key: "2019-01", page_size: 1)). + to be_a_kind_of(Enumerable) + end + + it "enumerates through all the pages" do + expect(instance.list(bucket: bucket, key: "2019-01", page_size: 2).to_a).to match_array([ + { bucket: bucket, keys: have_attributes(length: 2) }, + { bucket: bucket, keys: have_attributes(length: 1) }, + ]) + end + end end end diff --git a/spec/file_storage/in_memory_spec.rb b/spec/file_storage/in_memory_spec.rb index ead0013..6588143 100644 --- a/spec/file_storage/in_memory_spec.rb +++ b/spec/file_storage/in_memory_spec.rb @@ -55,11 +55,8 @@ end context "but we try to fetch it from a different bucket" do - it "raises an error" do - expect(instance.list(bucket: bucket, key: "whatever", page_size: 1000)).to eq( - bucket: bucket, - keys: [], - ) + it "returns an empty list" do + expect(instance.list(bucket: bucket, key: "whatever", page_size: 1000).to_a).to eq([]) end end end @@ -68,10 +65,7 @@ describe "#list" do context "when the bucket is empty" do it "returns an empty list" do - expect(instance.list(bucket: bucket, key: "whatever", page_size: 1000)).to eq( - bucket: bucket, - keys: [], - ) + expect(instance.list(bucket: bucket, key: "whatever", page_size: 1000).to_a).to eq([]) end end @@ -86,7 +80,7 @@ context "and we provide a matching prefix" do it "returns only the matching items" do - expect(instance.list(bucket: bucket, key: "2019-01", page_size: 1000)).to match( + expect(instance.list(bucket: bucket, key: "2019-01", page_size: 1000).first).to match( bucket: bucket, keys: match_array(%w[2019-01/hello1 2019-01/hello2 2019-01/hello3]), ) @@ -95,21 +89,32 @@ context "when the prefix doesn't match anything" do it "returns an empty list" do - expect(instance.list(bucket: bucket, key: "YOLO", page_size: 1000)).to match( - bucket: bucket, - keys: [], - ) + expect(instance.list(bucket: bucket, key: "YOLO", page_size: 1000).to_a).to eq([]) end end context "and we request fewer keys than they are available" do it "returns a subset of the matching keys" do - expect(instance.list(bucket: bucket, key: "2019-01", page_size: 2)).to match( + expect(instance.list(bucket: bucket, key: "2019-01", page_size: 2).first).to match( bucket: bucket, keys: have_attributes(length: 2), ) end end + + context "when there are multiple pages of results available" do + it "returns an enumerable" do + expect(instance.list(bucket: bucket, key: "2019-01", page_size: 1)). + to be_a_kind_of(Enumerable) + end + + it "enumerates through all the pages" do + expect(instance.list(bucket: bucket, key: "2019-01", page_size: 2).to_a).to match_array([ + { bucket: bucket, keys: have_attributes(length: 2) }, + { bucket: bucket, keys: have_attributes(length: 1) }, + ]) + end + end end end From 2ed8c041bf8227b7ae7412afa8d5f692a70de4a6 Mon Sep 17 00:00:00 2001 From: Ivan Giuliani Date: Tue, 27 Oct 2020 17:28:11 +0000 Subject: [PATCH 3/3] Paginate through results This adds support for large result set to FileStorage, however the pagination is happening behind the scenes and not exposed to the caller which will only see the final result set via an enumerable). This is a breaking change as previously the full result set was a returned in a list. --- lib/file_storage/key_storage.rb | 45 +++++++++++++++++---------- spec/file_storage/key_storage_spec.rb | 4 +-- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/lib/file_storage/key_storage.rb b/lib/file_storage/key_storage.rb index 3a30620..c8180ae 100644 --- a/lib/file_storage/key_storage.rb +++ b/lib/file_storage/key_storage.rb @@ -81,33 +81,44 @@ def upload!(content) # Lists all keys for the current adapter that have the reference key as prefix # - # This will return a list of valid keys in the format of `adapter://bucket/key`. The keys in - # the list will share the reference key as a prefix. + # Internally, this method will paginate through the result set. The default page size + # for the underlying adapter can be controlled via the `page_size` argument. # - # @param [Integer] page_size - # the number of items to be returned in the call. Note that if the `page_size` is - # smaller than the available keys for the given URI, there's no guarantee on the - # ordering upon which the keys will be returned and it's possible for two calls on - # the same URI and same `page_size` to return different result sets. - # @return [Array] A list of keys in the format of `adapter://bucket/key` + # This will return a enumerator of valid keys in the format of `adapter://bucket/key`. + # The keys in the list will share the reference key as a prefix. Underlying adapters will + # paginate the result set as the enumerable is consumed. The number of items per page + # can be controlled by the `page_size` argument. # - # @example List all files under a given prefix - # FileStorage.for("inmemory://bucket/prefix").list + # @param [Integer] page_size + # the max number of items to fetch for each page of results def list(page_size: 1000) FileStorage.logger.info(event: "key_storage.list_started") start = FileStorage::Timing.monotonic_now - result = adapter.list( + pages = adapter.list( bucket: bucket, key: key, page_size: page_size, - ).first - - FileStorage.logger.info(resource_count: result[:keys].count, - event: "key_storage.list_finished", - duration: FileStorage::Timing.monotonic_now - start) + ) - result.fetch(:keys, []).map { |key| "#{adapter_type}://#{result[:bucket]}/#{key}" } + page_count = 0 + Enumerator.new do |yielder| + pages.each do |page| + page_count += 1 + keys = page.fetch(:keys, []).map { |key| "#{adapter_type}://#{page[:bucket]}/#{key}" } + + FileStorage.logger.info( + event: "key_storage.list_page_fetched", + resource_count: keys.count, + page: page_count, + duration: FileStorage::Timing.monotonic_now - start, + ) + + keys.each do |key| + yielder.yield(key) + end + end + end end # Deletes the referenced key. diff --git a/spec/file_storage/key_storage_spec.rb b/spec/file_storage/key_storage_spec.rb index 181046f..f111b2d 100644 --- a/spec/file_storage/key_storage_spec.rb +++ b/spec/file_storage/key_storage_spec.rb @@ -46,10 +46,10 @@ def build_for(key) hash_including(event: "key_storage.list_started"), ) expect(FileStorage.logger).to receive(:info).with( - hash_including(event: "key_storage.list_finished"), + hash_including(event: "key_storage.list_page_fetched"), ) - build_for("inmemory://bucket").list + build_for("inmemory://bucket").list.to_a end context "but the URI does not have a trailing /" do