From bc5a57c7cedbe0994f80593a34e3dfedcb526fb6 Mon Sep 17 00:00:00 2001 From: Tulsi Shah <46474643+Tulsishah@users.noreply.github.com> Date: Mon, 28 Oct 2024 09:37:56 +0530 Subject: [PATCH] feat: Support error code for single shot upload (#699) * support error for single shot upload * remove .idea files * lint fix * review comments * review comments * review comments * lint fix * lint fix * lint fix --- testbench/rest_server.py | 41 +++++++++---- tests/test_testbench_retry.py | 106 ++++++++++++++++++++++++++++++++++ 2 files changed, 136 insertions(+), 11 deletions(-) diff --git a/testbench/rest_server.py b/testbench/rest_server.py index 72a0ae88..5e58296f 100644 --- a/testbench/rest_server.py +++ b/testbench/rest_server.py @@ -979,17 +979,36 @@ def object_insert(bucket_name): blob, projection = gcs_type.object.Object.init_media(flask.request, bucket) elif upload_type == "multipart": blob, projection = gcs_type.object.Object.init_multipart(flask.request, bucket) - # Handle stall for full uploads. - testbench.common.extract_instruction(request, context=None) - ( - stall_time, - after_bytes, - test_id, - ) = testbench.common.get_stall_uploads_after_bytes(db, request) - if stall_time and len(blob.media) >= after_bytes: - if test_id: - db.dequeue_next_instruction(test_id, "storage.objects.insert") - time.sleep(stall_time) + + # Handle errors for single-shot uploads. + testbench.common.extract_instruction(request, context=None) + ( + error_code, + after_bytes, + test_id, + ) = testbench.common.get_retry_uploads_error_after_bytes(db, request) + + if error_code and len(blob.media) >= after_bytes: + if test_id: + db.dequeue_next_instruction(test_id, "storage.objects.insert") + testbench.error.generic( + "Fault injected during a single-shot upload", + rest_code=error_code, + grpc_code=None, + context=None, + ) + + # Handle stall for single-shot uploads. + testbench.common.extract_instruction(request, context=None) + ( + stall_time, + after_bytes, + test_id, + ) = testbench.common.get_stall_uploads_after_bytes(db, request) + if stall_time and len(blob.media) >= after_bytes: + if test_id: + db.dequeue_next_instruction(test_id, "storage.objects.insert") + time.sleep(stall_time) db.insert_object( bucket_name, diff --git a/tests/test_testbench_retry.py b/tests/test_testbench_retry.py index 0030993e..5a6fc4f6 100644 --- a/tests/test_testbench_retry.py +++ b/tests/test_testbench_retry.py @@ -880,6 +880,112 @@ def test_write_retry_test_stall_single_shot_while_upload_size_less_than_stall_si self.assertEqual(response.status_code, 200) self.assertLess(elapsed_time, 1) + def test_retry_test_return_error_after_bytes_for_single_shot_upload(self): + # Create a new bucket + response = self.client.post( + "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) + ) + self.assertEqual(response.status_code, 200) + + # Setup a stall for reading back the object. + response = self.client.post( + "/retry_test", + data=json.dumps( + { + "instructions": { + "storage.objects.insert": [ + "return-503-after-250K", + ] + } + } + ), + content_type="application/json", + ) + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + test_id = create_rest.get("id") + + # Upload the 256KiB of data and trigger the stall. + data = self._create_block(UPLOAD_QUANTUM) + self.assertEqual(len(data), UPLOAD_QUANTUM) + + boundary, payload = format_multipart_upload({}, data) + response = self.client.post( + "/upload/storage/v1/b/bucket-name/o", + query_string={"uploadType": "multipart", "name": "stall"}, + content_type="multipart/related; boundary=" + boundary, + headers={ + "x-retry-test-id": test_id, + }, + data=payload, + ) + self.assertEqual(response.status_code, 503) + + # Upload the data again and check that stall not happen. + response = self.client.post( + "/upload/storage/v1/b/bucket-name/o", + query_string={"uploadType": "multipart", "name": "stall"}, + content_type="multipart/related; boundary=" + boundary, + headers={ + "x-retry-test-id": test_id, + }, + data=payload, + ) + self.assertEqual(response.status_code, 200) + + def test_write_retry_error_single_shot_while_upload_size_less_than_size( + self, + ): + # Create a new bucket + response = self.client.post( + "/storage/v1/b", data=json.dumps({"name": "bucket-name"}) + ) + self.assertEqual(response.status_code, 200) + + # Setup a error for reading back the object. + response = self.client.post( + "/retry_test", + data=json.dumps( + { + "instructions": { + "storage.objects.insert": [ + "return-503-after-250K", + ] + } + } + ), + content_type="application/json", + ) + self.assertEqual(response.status_code, 200) + self.assertTrue( + response.headers.get("content-type").startswith("application/json") + ) + + create_rest = json.loads(response.data) + self.assertIn("id", create_rest) + test_id = create_rest.get("id") + + # Upload the 200KiB of data and check error not happen. + data = self._create_block(200 * 1024) + self.assertEqual(len(data), 200 * 1024) + + boundary, payload = format_multipart_upload({}, data) + response = self.client.post( + "/upload/storage/v1/b/bucket-name/o", + query_string={"uploadType": "multipart", "name": "error"}, + content_type="multipart/related; boundary=" + boundary, + headers={ + "x-retry-test-id": test_id, + }, + data=payload, + ) + self.assertEqual(response.status_code, 200) + class TestTestbenchRetryGrpc(unittest.TestCase): def setUp(self):