From 384fab5dc05ab3659ddfaa6baf2aa5d9416fd954 Mon Sep 17 00:00:00 2001 From: Andrew Morton Date: Tue, 18 Apr 2023 17:51:40 +0100 Subject: [PATCH] Add KeyStreamer interface for uploads and downloads. Completely stolen from https://github.com/gocardless/bucket-store/pull/63. The KeyStreamer expects uploads / downloads to be expressed in terms of IO operations. For example a download is actually `download into this IO` and upload is `upload the content from this file`. Uploads / downloads of Strings are considered a special case of this. However is distinct enough that we have streaming be it's own API. --- lib/bucket_store/key_storage.rb | 131 ++++++++++++++++++-------- spec/bucket_store/key_storage_spec.rb | 92 +++++++++++++++++- 2 files changed, 181 insertions(+), 42 deletions(-) diff --git a/lib/bucket_store/key_storage.rb b/lib/bucket_store/key_storage.rb index 17709a7..f4c4a7d 100644 --- a/lib/bucket_store/key_storage.rb +++ b/lib/bucket_store/key_storage.rb @@ -15,6 +15,88 @@ class KeyStorage disk: Disk, }.freeze + # Defines a streaming interface for download and upload operations. + # + # Note that individual adapters may require additional configuration for the correct + # behavior of the streaming interface. + class KeyStreamer + attr_reader :bucket, :key, :adapter, :adapter_type + + def initialize(adapter:, adapter_type:, bucket:, key:) + @adapter = adapter + @adapter_type = adapter_type + @bucket = bucket + @key = key + end + + # Streams the content of the reference key into a File like object + # + # @return hash containing the bucket, the key and file like object passed in as input + # + # @see KeyStorage#download + # @example Download a key + # buffer = StringIO.new + # BucketStore.for("inmemory://bucket/file.xml").stream.download(file: buffer) + # buffer.string == "Imagine I'm a 2GB file" + def download(file:) + BucketStore.logger.info(event: "key_storage.stream.download_started") + + start = BucketStore::Timing.monotonic_now + adapter.download( + bucket: bucket, + key: key, + file: file, + ) + + BucketStore.logger.info(event: "key_storage.stream.download_finished", + duration: BucketStore::Timing.monotonic_now - start) + + { + bucket: bucket, + key: key, + file: file, + } + end + + # Performs a streaming upload to the backing object store + # + # @return the generated key for the new object + # + # @see KeyStorage#upload! + # @example Upload a key + # buffer = StringIO.new("Imagine I'm a 2GB file") + # BucketStore.for("inmemory://bucket/file.xml").stream.upload!(file: buffer) + def upload!(file:) + raise ArgumentError, "Key cannot be empty" if key.empty? + + BucketStore.logger.info(event: "key_storage.stream.upload_started", + **log_context) + + start = BucketStore::Timing.monotonic_now + adapter.upload!( + bucket: bucket, + key: key, + file: file, + ) + + BucketStore.logger.info(event: "key_storage.stream.upload_finished", + duration: BucketStore::Timing.monotonic_now - start, + **log_context) + + "#{adapter_type}://#{bucket}/#{key}" + end + + private + + def log_context + { + bucket: bucket, + key: key, + adapter_type: adapter_type, + }.compact + end + end + attr_reader :bucket, :key, :adapter_type def initialize(adapter:, bucket:, key:) @@ -40,22 +122,10 @@ def filename # @example Download a key # BucketStore.for("inmemory://bucket/file.xml").download def download - raise ArgumentError, "Key cannot be empty" if key.empty? - - BucketStore.logger.info(event: "key_storage.download_started") - - start = BucketStore::Timing.monotonic_now buffer = StringIO.new - adapter.download(bucket: bucket, key: key, file: buffer) - - BucketStore.logger.info(event: "key_storage.download_finished", - duration: BucketStore::Timing.monotonic_now - start) - - { - bucket: bucket, - key: key, - content: buffer.string, - } + stream.download(file: buffer). + except(:file). + merge(content: buffer.string) end # Uploads the given file to the reference key location. @@ -67,23 +137,16 @@ def download # @example Upload a file # BucketStore.for("inmemory://bucket/file.xml").upload!("hello world") def upload!(content) - raise ArgumentError, "Key cannot be empty" if key.empty? - - BucketStore.logger.info(event: "key_storage.upload_started", - **log_context) - - start = BucketStore::Timing.monotonic_now - result = adapter.upload!( - bucket: bucket, - key: key, - file: StringIO.new(content), - ) + stream.upload!(file: StringIO.new(content)) + end - BucketStore.logger.info(event: "key_storage.upload_finished", - duration: BucketStore::Timing.monotonic_now - start, - **log_context) + # Returns an interface for streaming operations + # + # @return [KeyStreamer] An interface for streaming operations + def stream + raise ArgumentError, "Key cannot be empty" if key.empty? - "#{adapter_type}://#{result[:bucket]}/#{result[:key]}" + KeyStreamer.new(adapter: adapter, adapter_type: adapter_type, bucket: bucket, key: key) end # Lists all keys for the current adapter that have the reference key as prefix @@ -164,13 +227,5 @@ def exists? private attr_reader :adapter - - def log_context - { - bucket: bucket, - key: key, - adapter_type: adapter_type, - }.compact - end end end diff --git a/spec/bucket_store/key_storage_spec.rb b/spec/bucket_store/key_storage_spec.rb index ef3c993..8fd8814 100644 --- a/spec/bucket_store/key_storage_spec.rb +++ b/spec/bucket_store/key_storage_spec.rb @@ -75,10 +75,10 @@ def build_for(key) it "logs the operation" do expect(BucketStore.logger).to receive(:info).with( - hash_including(event: "key_storage.download_started"), + hash_including(event: "key_storage.stream.download_started"), ) expect(BucketStore.logger).to receive(:info).with( - hash_including(event: "key_storage.download_finished"), + hash_including(event: "key_storage.stream.download_finished"), ) build_for("inmemory://bucket/file1").download @@ -100,10 +100,10 @@ def build_for(key) it "logs the operation" do expect(BucketStore.logger).to receive(:info).with( - hash_including(event: "key_storage.upload_started"), + hash_including(event: "key_storage.stream.upload_started"), ) expect(BucketStore.logger).to receive(:info).with( - hash_including(event: "key_storage.upload_finished"), + hash_including(event: "key_storage.stream.upload_finished"), ) build_for("inmemory://bucket/file1").upload!("hello") @@ -117,6 +117,90 @@ def build_for(key) end end + describe "#stream" do + let(:stream) { build_for("inmemory://bucket/file1").stream } + + it "will return an object" do + expect { stream }.to_not raise_error + expect(stream).to_not be_nil + end + + context "when we try to upload a bucket" do + it "raises an error" do + expect { build_for("inmemory://bucket").stream }. + to raise_error(ArgumentError, /key cannot be empty/i) + end + end + + describe "#download" do + let(:input_file_1) { StringIO.new("content1") } + let(:input_file_2) { StringIO.new("content") } + let(:output_file) { StringIO.new } + + before do + build_for("inmemory://bucket/file1"). + stream. + upload!(file: StringIO.new("content1")) + build_for("inmemory://bucket/file2"). + stream. + upload!(file: StringIO.new("content2")) + end + + it "downloads the given file" do + expect( + build_for("inmemory://bucket/file1"). + stream. + download(file: output_file), + ). + to match(hash_including(file: output_file)) + expect(output_file.string).to eq("content1") + end + + it "logs the operation" do + expect(BucketStore.logger).to receive(:info).with( + hash_including(event: "key_storage.stream.download_started"), + ) + expect(BucketStore.logger).to receive(:info).with( + hash_including(event: "key_storage.stream.download_finished"), + ) + + build_for("inmemory://bucket/file1").stream.download(file: output_file) + end + + context "when we try to download a bucket" do + it "raises an error" do + expect { build_for("inmemory://bucket").download }. + to raise_error(ArgumentError, /key cannot be empty/i) + end + end + end + + describe "#upload!" do + it "will upload from a file" do + expect(stream.upload!(file: StringIO.new("hello"))). + to eq("inmemory://bucket/file1") + end + + it "logs the operation" do + expect(BucketStore.logger).to receive(:info).with( + hash_including(event: "key_storage.stream.upload_started"), + ) + expect(BucketStore.logger).to receive(:info).with( + hash_including(event: "key_storage.stream.upload_finished"), + ) + + stream.upload!(file: StringIO.new("hello")) + end + + context "when we try to upload a bucket" do + it "raises an error" do + expect { build_for("inmemory://bucket").upload!("content") }. + to raise_error(ArgumentError, /key cannot be empty/i) + end + end + end + end + describe "#delete!" do before do build_for("inmemory://bucket/file1").upload!("content1")