diff --git a/CHANGELOG.md b/CHANGELOG.md index 337b458..1df43cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +v0.6.0 +------ +- [BREAKING] Drop support for Ruby 2.6 +- Introduce support for streaming downloads + + v0.5.0 ------ - Support escaping of URI keys. Fixes #46. diff --git a/docker-compose.yml b/docker-compose.yml index eed352c..dbb3d0d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,6 +12,7 @@ services: MINIO_REGION_NAME: us-east-1 ports: - "9000:9000" + - "9001:9001" gcp-simulator: image: fsouza/fake-gcs-server hostname: gcp diff --git a/lib/bucket_store/disk.rb b/lib/bucket_store/disk.rb index 28b183a..f13312b 100644 --- a/lib/bucket_store/disk.rb +++ b/lib/bucket_store/disk.rb @@ -4,6 +4,8 @@ module BucketStore class Disk + DEFAULT_STREAM_CHUNK_SIZE_BYTES = 1024 * 1024 * 4 # 4Mb + def self.build(base_dir = ENV["DISK_ADAPTER_BASE_DIR"]) base_dir ||= Dir.tmpdir Disk.new(base_dir) @@ -33,6 +35,25 @@ def download(bucket:, key:) end end + def stream_download(bucket:, key:, chunk_size: nil) + chunk_size ||= DEFAULT_STREAM_CHUNK_SIZE_BYTES + + fd = File.open(key_path(bucket, key), "r") + metadata = { + bucket: bucket, + key: key, + }.freeze + + Enumerator.new do |yielder| + loop do + v = fd.gets(chunk_size) + break if v.nil? + + yielder.yield([metadata, v]) + end + end + end + def list(bucket:, key:, page_size:) root = Pathname.new(bucket_root(bucket)) diff --git a/lib/bucket_store/gcs.rb b/lib/bucket_store/gcs.rb index 802345e..ba1436c 100644 --- a/lib/bucket_store/gcs.rb +++ b/lib/bucket_store/gcs.rb @@ -9,6 +9,8 @@ module BucketStore class Gcs DEFAULT_TIMEOUT_SECONDS = 30 + DEFAULT_STREAM_CHUNK_SIZE_BYTES = 1024 * 1024 * 4 # 4Mb + def self.build(timeout_seconds = DEFAULT_TIMEOUT_SECONDS) Gcs.new(timeout_seconds) end @@ -56,6 +58,40 @@ def download(bucket:, key:) } end + def stream_download(bucket:, key:, chunk_size: nil) + chunk_size ||= DEFAULT_STREAM_CHUNK_SIZE_BYTES + + metadata = { + bucket: bucket, + key: key, + }.freeze + + file = get_bucket(bucket).file(key) + obj_size = file.size + + Enumerator.new do |yielder| + start = 0 + while start < obj_size + stop = [start + chunk_size, obj_size].min + + # We simulate an enumerator-based streaming approach by using partial range + # downloads as there's no direct support for streaming downloads. The returned + # object is a StringIO, so we must `.rewind` before we can access it. + obj_io = file.download(range: start...stop) + obj_io&.rewind + + # rubocop:disable Style/ZeroLengthPredicate + # StringIO does not define the `.empty?` method that rubocop is so keen on using + body = obj_io&.read + start += body.size + break if body.nil? || body.size.zero? + # rubocop:enable Style/ZeroLengthPredicate + + yielder.yield([metadata, body]) + end + end + end + def list(bucket:, key:, page_size:) Enumerator.new do |yielder| token = nil diff --git a/lib/bucket_store/in_memory.rb b/lib/bucket_store/in_memory.rb index 6dfccc9..0fa2581 100644 --- a/lib/bucket_store/in_memory.rb +++ b/lib/bucket_store/in_memory.rb @@ -2,6 +2,8 @@ module BucketStore class InMemory + DEFAULT_STREAM_CHUNK_SIZE_BYTES = 1024 * 1024 * 4 # 4Mb + def self.build InMemory.instance end @@ -41,6 +43,25 @@ def download(bucket:, key:) } end + def stream_download(bucket:, key:, chunk_size: nil) + chunk_size ||= DEFAULT_STREAM_CHUNK_SIZE_BYTES + + content_stream = StringIO.new(@buckets[bucket].fetch(key)) + metadata = { + bucket: bucket, + key: key, + }.freeze + + Enumerator.new do |yielder| + loop do + v = content_stream.read(chunk_size) + break if v.nil? + + yielder.yield([metadata, v]) + end + end + end + def list(bucket:, key:, page_size:) @buckets[bucket].keys. select { |k| k.start_with?(key) }. diff --git a/lib/bucket_store/key_storage.rb b/lib/bucket_store/key_storage.rb index 4287d95..f3e7acc 100644 --- a/lib/bucket_store/key_storage.rb +++ b/lib/bucket_store/key_storage.rb @@ -15,6 +15,59 @@ class KeyStorage disk: Disk, }.freeze + # Defines a streaming interface for download and upload operations. + # + # Note that individual adapters may require additional configuration for the correct + # behavior of the streaming interface. + class KeyStreamer + attr_reader :bucket, :key, :adapter + + def initialize(adapter:, bucket:, key:) + @adapter = adapter + @bucket = bucket + @key = key + end + + # Streams the content of the reference key + # + # @param [optional, Integer] chunk_size The maximum size of individual chunks. + # Note that adapters will only return at most the given size, but could + # return a smaller chunk when needed. + # + # @return [Enumerator] + # An enumerator where each item is a hash that includes a chunk of the downloaded result. + # The format of the hash returned on each iteration is compatible with what is returned by + # the non-streaming version of the `download` method, however the content of each item is + # limited in size. + # + # @see KeyStorage#download + # @example Download a key + # BucketStore.for("inmemory://bucket/file.xml").stream.download + def download(chunk_size: nil) + if !chunk_size.nil? && chunk_size <= 0 + raise ArgumentError, "Chunk size must be > 0 when specified" + end + + BucketStore.logger.info(event: "key_storage.stream.download_started") + + start = BucketStore::Timing.monotonic_now + result = adapter.stream_download( + bucket: bucket, + key: key, + chunk_size: chunk_size, + ) + + BucketStore.logger.info(event: "key_storage.stream.download_prepared", + duration: BucketStore::Timing.monotonic_now - start) + + result + end + + def upload + raise NotImplementedError + end + end + attr_reader :bucket, :key, :adapter_type def initialize(adapter:, bucket:, key:) @@ -53,6 +106,15 @@ def download result end + # Returns an interface for streaming operations + # + # @return [KeyStreamer] An interface for streaming operations + def stream + raise ArgumentError, "Key cannot be empty" if key.empty? + + KeyStreamer.new(adapter: adapter, bucket: bucket, key: key) + end + # Uploads the given content to the reference key location. # # If the `key` already exists, its content will be replaced by the one in input. diff --git a/lib/bucket_store/s3.rb b/lib/bucket_store/s3.rb index 393129a..ffa24a2 100644 --- a/lib/bucket_store/s3.rb +++ b/lib/bucket_store/s3.rb @@ -8,6 +8,8 @@ module BucketStore class S3 DEFAULT_TIMEOUT_SECONDS = 30 + DEFAULT_STREAM_CHUNK_SIZE_BYTES = 1024 * 1024 * 4 # 4Mb + def self.build(open_timeout_seconds = DEFAULT_TIMEOUT_SECONDS, read_timeout_seconds = DEFAULT_TIMEOUT_SECONDS) S3.new(open_timeout_seconds, read_timeout_seconds) @@ -46,6 +48,50 @@ def download(bucket:, key:) } end + def stream_download(bucket:, key:, chunk_size: nil) + chunk_size ||= DEFAULT_STREAM_CHUNK_SIZE_BYTES + + metadata = { + bucket: bucket, + key: key, + }.freeze + + obj_size = storage.head_object(bucket: bucket, key: key)&.content_length || 0 + + Enumerator.new do |yielder| + start = 0 + while start < obj_size + stop = [start + chunk_size - 1, obj_size].min + + # S3 only supports streaming writes to an IO object (e.g. a file or StringIO), + # but that means we can't access the content of the downloaded chunk in-memory. + # Additionally, the block-based support in the sdk also doesn't support retries, + # which could lead to file corruption. + # (see https://aws.amazon.com/blogs/developer/downloading-objects-from-amazon-s3-using-the-aws-sdk-for-ruby/) + # + # We simulate an enumerator-based streaming approach by using partial range + # downloads instead. There's no helper methods for range downloads in the Ruby + # SDK, so we have to build our own range query. + # Range is specified in the same format as the HTTP range header (see + # https://www.rfc-editor.org/rfc/rfc9110.html#name-range) + obj = storage.get_object( + bucket: bucket, + key: key, + range: "bytes=#{start}-#{stop}", + ) + + # rubocop:disable Style/ZeroLengthPredicate + # StringIO does not define the `.empty?` method that rubocop is so keen on using + body = obj&.body&.read + start += body.size + break if body.nil? || body.size.zero? + # rubocop:enable Style/ZeroLengthPredicate + + yielder.yield([metadata, body]) + end + end + end + def list(bucket:, key:, page_size:) Enumerator.new do |yielder| page = storage.list_objects_v2(bucket: bucket, prefix: key, max_keys: page_size) diff --git a/spec/bucket_store/key_storage_spec.rb b/spec/bucket_store/key_storage_spec.rb index ef3c993..480dd1f 100644 --- a/spec/bucket_store/key_storage_spec.rb +++ b/spec/bucket_store/key_storage_spec.rb @@ -154,4 +154,31 @@ def build_for(key) expect(build_for("inmemory://bucket/prefix/a").exists?).to be false end end + + describe "#stream" do + let!(:large_file_content) { "Z" * 1024 * 1024 * 10 } # 10Mb + + before do + build_for("inmemory://bucket/small").upload!("hello world") + build_for("inmemory://bucket/large").upload!(large_file_content) + end + + describe "#download" do + it "returns a single chunk for small files" do + expect(build_for("inmemory://bucket/small").stream.download).to contain_exactly([ + { bucket: "bucket", key: "small" }, an_instance_of(String) + ]) + end + + it "returns the file content in chunks for larger files" do + rebuilt = + build_for("inmemory://bucket/large").stream.download.map do |metadata, chunk| + expect(metadata).to eq({ bucket: "bucket", key: "large" }) + chunk + end.join + + expect(rebuilt).to eq(large_file_content) + end + end + end end diff --git a/spec/bucket_store_integration_spec.rb b/spec/bucket_store_integration_spec.rb index c808615..428ddf7 100644 --- a/spec/bucket_store_integration_spec.rb +++ b/spec/bucket_store_integration_spec.rb @@ -24,55 +24,100 @@ end shared_examples "adapter integration" do |base_bucket_uri| - # This is presented as a single idempotent test as otherwise resetting state between execution - # makes things very complicated with no huge benefits. - - it "has a consistent interface" do - # Write 201 files - file_list = [] - 201.times do |i| - filename = "file#{(i + 1).to_s.rjust(3, '0')}.txt" - file_list << filename - - # the body of the file is the filename itself - described_class.for("#{base_bucket_uri}/prefix/#{filename}").upload!(filename) + context "using #{base_bucket_uri}" do + before do + described_class.for(base_bucket_uri).list.each do |path| + described_class.for(path).delete! + end end - # Add some files with spaces - described_class.for("#{base_bucket_uri}/prefix/i have a space.txt"). - upload!("i have a space.txt") - described_class.for("#{base_bucket_uri}/prefix/another space.txt"). - upload!("another space.txt") - - file_list << "i have a space.txt" - file_list << "another space.txt" - - # List with prefix should only return the matching files - expect(described_class.for("#{base_bucket_uri}/prefix/file1").list.to_a.size).to eq(100) - expect(described_class.for("#{base_bucket_uri}/prefix/file2").list.to_a.size).to eq(2) - expect(described_class.for("#{base_bucket_uri}/prefix/").list.to_a.size).to eq(203) - - # List (without prefixes) should return everything - expect(described_class.for(base_bucket_uri.to_s).list.to_a). - to match_array(file_list.map { |filename| "#{base_bucket_uri}/prefix/#{filename}" }) - - # We know the content of the file, we can check `.download` returns it as expected - all_files = file_list.map do |filename| - [filename, "#{base_bucket_uri}/prefix/#{filename}"] + it "returns an empty bucket when no files are uploaded" do + expect(described_class.for(base_bucket_uri.to_s).list.to_a.size).to eq(0) end - all_files.each do |content, key| - expect(described_class.for(key).download[:content]).to eq(content) + + it "has a consistent interface" do + # Write 201 files + file_list = [] + 201.times do |i| + filename = "file#{(i + 1).to_s.rjust(3, '0')}.txt" + file_list << filename + + # the body of the file is the filename itself + described_class.for("#{base_bucket_uri}/prefix/#{filename}").upload!(filename) + end + + # Add some files with spaces + described_class.for("#{base_bucket_uri}/prefix/i have a space.txt"). + upload!("i have a space.txt") + described_class.for("#{base_bucket_uri}/prefix/another space.txt"). + upload!("another space.txt") + + file_list << "i have a space.txt" + file_list << "another space.txt" + + # List with prefix should only return the matching files + expect(described_class.for("#{base_bucket_uri}/prefix/file1").list.to_a.size).to eq(100) + expect(described_class.for("#{base_bucket_uri}/prefix/file2").list.to_a.size).to eq(2) + expect(described_class.for("#{base_bucket_uri}/prefix/").list.to_a.size).to eq(203) + + # List (without prefixes) should return everything + expect(described_class.for(base_bucket_uri.to_s).list.to_a). + to match_array(file_list.map { |filename| "#{base_bucket_uri}/prefix/#{filename}" }) + + # We know the content of the file, we can check `.download` returns it as expected + all_files = file_list.map do |filename| + [filename, "#{base_bucket_uri}/prefix/#{filename}"] + end + all_files.each do |content, key| + expect(described_class.for(key).download[:content]).to eq(content) + end + + # Delete all the files, the bucket should be empty afterwards + described_class.for(base_bucket_uri.to_s).list.each do |key| + described_class.for(key).delete! + end + expect(described_class.for(base_bucket_uri.to_s).list.to_a.size).to eq(0) end - # Delete all the files, the bucket should be empty afterwards - file_list.map { |filename| "#{base_bucket_uri}/prefix/#{filename}" }.each do |key| - described_class.for(key).delete! + context "using the streaming interface" do + it "supports large file downloads" do + # Upload a large file + large_file_content = "Z" * 1024 * 1024 * 10 # 10Mb + described_class. + for("#{base_bucket_uri}/large.txt"). + upload!(large_file_content) + + # Streaming downloads should return a chunked response + rebuilt_large_file = + described_class.for("#{base_bucket_uri}/large.txt"). + stream. + download. + map { |_meta, chunk| chunk }. + join + + expect(rebuilt_large_file.size).to eq(large_file_content.size) + expect(rebuilt_large_file).to eq(large_file_content) + end + + it "allows downloads of individual small chunks" do + described_class. + for("#{base_bucket_uri}/large.txt"). + upload!("1234567890") + + chunks = described_class.for("#{base_bucket_uri}/large.txt"). + stream. + download(chunk_size: 1). + to_a + + expect(chunks.size).to eq(10) + expect(chunks.map { |_meta, chunk| chunk }).to match_array( + %w[1 2 3 4 5 6 7 8 9 0], + ) + end end - expect(described_class.for(base_bucket_uri.to_s).list.to_a.size).to eq(0) end end - # We don't test GCS as there's no sensible way of running a local simulator include_examples "adapter integration", "inmemory://bucket" include_examples "adapter integration", "disk://bucket" include_examples "adapter integration", "s3://bucket"