From 5fbcba9b863722dd80e1c4854b24f284364b240d Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Thu, 30 May 2024 09:22:11 +0100 Subject: [PATCH 1/4] feat(s3): Don't upload if file of same name and size already exists --- src/nwp_consumer/internal/outputs/s3/client.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/nwp_consumer/internal/outputs/s3/client.py b/src/nwp_consumer/internal/outputs/s3/client.py index 9d445ef8..93fe0be6 100644 --- a/src/nwp_consumer/internal/outputs/s3/client.py +++ b/src/nwp_consumer/internal/outputs/s3/client.py @@ -74,6 +74,17 @@ def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> pathlib.Path: src=src.as_posix(), dst=(self.__bucket / dst).as_posix(), ) + + # If file already exists in store and is of the same size, skip the upload + if self.exists(dst=dst) and self.__fs.du((self.__bucket / dst).as_posix()) == src.stat().st_size: + log.debug( + event="file of same size already exists in s3, skipping", + src=src.as_posix(), + dst=(self.__bucket / dst).as_posix(), + ) + return dst + + # Upload the file to the store self.__fs.put(lpath=src.as_posix(), rpath=(self.__bucket / dst).as_posix(), recursive=True) # Don't delete cached file as user may want to do further processing locally. nbytes = self.__fs.du((self.__bucket / dst).as_posix()) From 21050817b0d9aada16d4d5bb28836139aa96140c Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Thu, 30 May 2024 09:28:01 +0100 Subject: [PATCH 2/4] feat(s3): Add test for non-overwrite method --- .../internal/outputs/s3/test_client.py | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/nwp_consumer/internal/outputs/s3/test_client.py b/src/nwp_consumer/internal/outputs/s3/test_client.py index ab59dbe0..893542ea 100644 --- a/src/nwp_consumer/internal/outputs/s3/test_client.py +++ b/src/nwp_consumer/internal/outputs/s3/test_client.py @@ -122,6 +122,31 @@ def test_store(self) -> None: self.testS3.delete_object(Bucket=BUCKET, Key=dst.as_posix()) src.unlink(missing_ok=True) + ## Test the store doesn't overwrite an existing file of equivalent size + + # Create a mock file in the store + self.testS3.put_object( + Bucket=BUCKET, + Key=dst.as_posix(), + Body=bytes(fileName, "utf-8"), + ) + + # Create a temporary file with the same data + src.write_bytes(bytes(fileName, "utf-8")) + + # Get the modified date of the file in the store + response = self.testS3.head_object(Bucket=BUCKET, Key=dst.as_posix()) + lastModified = response["LastModified"] + + # Call the store method on the file + name = self.client.store(src=src, dst=dst) + + # Verify the file in the store was not overwritten + response = self.testS3.get_object(Bucket=BUCKET, Key=dst.as_posix()) + self.assertEqual(response["Body"].read(), bytes(fileName, "utf-8")) + self.assertEqual(lastModified, response["LastModified"]) + + def test_listInitTimes(self) -> None: # Create mock folders/files in the raw directory self.testS3.put_object( From 44f7fb15e1ab5522539ea6dbfd6932d430f13ffb Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Thu, 30 May 2024 11:36:56 +0100 Subject: [PATCH 3/4] chore(ceda): Print dataset in test --- src/nwp_consumer/internal/inputs/ceda/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/nwp_consumer/internal/inputs/ceda/client.py b/src/nwp_consumer/internal/inputs/ceda/client.py index e4688ec6..c479be61 100644 --- a/src/nwp_consumer/internal/inputs/ceda/client.py +++ b/src/nwp_consumer/internal/inputs/ceda/client.py @@ -244,7 +244,7 @@ def mapCachedRaw(self, *, p: pathlib.Path) -> xr.Dataset: }, ) ) - + print(wholesaleDataset) return wholesaleDataset def parameterConformMap(self) -> dict[str, internal.OCFParameter]: From 53c2d1f86ca36522570d1243e050e782debe01cd Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Thu, 30 May 2024 11:48:28 +0100 Subject: [PATCH 4/4] fix(ceda): Remove index creation --- src/nwp_consumer/internal/inputs/ceda/client.py | 2 +- src/nwp_consumer/internal/inputs/ceda/test_client.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/nwp_consumer/internal/inputs/ceda/client.py b/src/nwp_consumer/internal/inputs/ceda/client.py index c479be61..d190953a 100644 --- a/src/nwp_consumer/internal/inputs/ceda/client.py +++ b/src/nwp_consumer/internal/inputs/ceda/client.py @@ -177,6 +177,7 @@ def mapCachedRaw(self, *, p: pathlib.Path) -> xr.Dataset: datasets: list[xr.Dataset] = cfgrib.open_datasets( path=p.as_posix(), chunks={"time": 1, "step": -1, "variable": -1, "x": "auto", "y": "auto"}, + backend_kwargs={"indexpath": ""}, ) except Exception as e: log.warn(event="error converting raw file to dataset", filepath=p.as_posix(), error=e) @@ -244,7 +245,6 @@ def mapCachedRaw(self, *, p: pathlib.Path) -> xr.Dataset: }, ) ) - print(wholesaleDataset) return wholesaleDataset def parameterConformMap(self) -> dict[str, internal.OCFParameter]: diff --git a/src/nwp_consumer/internal/inputs/ceda/test_client.py b/src/nwp_consumer/internal/inputs/ceda/test_client.py index 5b4a4b95..1eb5d587 100644 --- a/src/nwp_consumer/internal/inputs/ceda/test_client.py +++ b/src/nwp_consumer/internal/inputs/ceda/test_client.py @@ -40,7 +40,7 @@ def test_convertsWholesale1FileCorrectly(self) -> None: # Ensure the dimensions have the right sizes self.assertDictEqual( - {"init_time": 1, "step": 3, "y": 704, "x": 548}, + {"init_time": 1, "step": 4, "y": 704, "x": 548}, dict(out.sizes.items()), ) # Ensure the correct variables are in the variable dimension @@ -56,7 +56,7 @@ def test_convertsWholesale2FileCorrectly(self) -> None: # Ensure the dimensions have the right sizes self.assertDictEqual( - {"init_time": 1, "step": 3, "y": 704, "x": 548}, + {"init_time": 1, "step": 4, "y": 704, "x": 548}, dict(out.sizes.items()), ) # Ensure the correct variables are in the variable dimension