diff --git a/src/nwp_consumer/internal/inputs/ceda/client.py b/src/nwp_consumer/internal/inputs/ceda/client.py index e4688ec6..d190953a 100644 --- a/src/nwp_consumer/internal/inputs/ceda/client.py +++ b/src/nwp_consumer/internal/inputs/ceda/client.py @@ -177,6 +177,7 @@ def mapCachedRaw(self, *, p: pathlib.Path) -> xr.Dataset: datasets: list[xr.Dataset] = cfgrib.open_datasets( path=p.as_posix(), chunks={"time": 1, "step": -1, "variable": -1, "x": "auto", "y": "auto"}, + backend_kwargs={"indexpath": ""}, ) except Exception as e: log.warn(event="error converting raw file to dataset", filepath=p.as_posix(), error=e) @@ -244,7 +245,6 @@ def mapCachedRaw(self, *, p: pathlib.Path) -> xr.Dataset: }, ) ) - return wholesaleDataset def parameterConformMap(self) -> dict[str, internal.OCFParameter]: diff --git a/src/nwp_consumer/internal/inputs/ceda/test_client.py b/src/nwp_consumer/internal/inputs/ceda/test_client.py index 5b4a4b95..1eb5d587 100644 --- a/src/nwp_consumer/internal/inputs/ceda/test_client.py +++ b/src/nwp_consumer/internal/inputs/ceda/test_client.py @@ -40,7 +40,7 @@ def test_convertsWholesale1FileCorrectly(self) -> None: # Ensure the dimensions have the right sizes self.assertDictEqual( - {"init_time": 1, "step": 3, "y": 704, "x": 548}, + {"init_time": 1, "step": 4, "y": 704, "x": 548}, dict(out.sizes.items()), ) # Ensure the correct variables are in the variable dimension @@ -56,7 +56,7 @@ def test_convertsWholesale2FileCorrectly(self) -> None: # Ensure the dimensions have the right sizes self.assertDictEqual( - {"init_time": 1, "step": 3, "y": 704, "x": 548}, + {"init_time": 1, "step": 4, "y": 704, "x": 548}, dict(out.sizes.items()), ) # Ensure the correct variables are in the variable dimension diff --git a/src/nwp_consumer/internal/outputs/s3/client.py b/src/nwp_consumer/internal/outputs/s3/client.py index 9d445ef8..93fe0be6 100644 --- a/src/nwp_consumer/internal/outputs/s3/client.py +++ b/src/nwp_consumer/internal/outputs/s3/client.py @@ -74,6 +74,17 @@ def store(self, *, src: pathlib.Path, dst: pathlib.Path) -> pathlib.Path: src=src.as_posix(), dst=(self.__bucket / dst).as_posix(), ) + + # If file already exists in store and is of the same size, skip the upload + if self.exists(dst=dst) and self.__fs.du((self.__bucket / dst).as_posix()) == src.stat().st_size: + log.debug( + event="file of same size already exists in s3, skipping", + src=src.as_posix(), + dst=(self.__bucket / dst).as_posix(), + ) + return dst + + # Upload the file to the store self.__fs.put(lpath=src.as_posix(), rpath=(self.__bucket / dst).as_posix(), recursive=True) # Don't delete cached file as user may want to do further processing locally. nbytes = self.__fs.du((self.__bucket / dst).as_posix()) diff --git a/src/nwp_consumer/internal/outputs/s3/test_client.py b/src/nwp_consumer/internal/outputs/s3/test_client.py index ab59dbe0..893542ea 100644 --- a/src/nwp_consumer/internal/outputs/s3/test_client.py +++ b/src/nwp_consumer/internal/outputs/s3/test_client.py @@ -122,6 +122,31 @@ def test_store(self) -> None: self.testS3.delete_object(Bucket=BUCKET, Key=dst.as_posix()) src.unlink(missing_ok=True) + ## Test the store doesn't overwrite an existing file of equivalent size + + # Create a mock file in the store + self.testS3.put_object( + Bucket=BUCKET, + Key=dst.as_posix(), + Body=bytes(fileName, "utf-8"), + ) + + # Create a temporary file with the same data + src.write_bytes(bytes(fileName, "utf-8")) + + # Get the modified date of the file in the store + response = self.testS3.head_object(Bucket=BUCKET, Key=dst.as_posix()) + lastModified = response["LastModified"] + + # Call the store method on the file + name = self.client.store(src=src, dst=dst) + + # Verify the file in the store was not overwritten + response = self.testS3.get_object(Bucket=BUCKET, Key=dst.as_posix()) + self.assertEqual(response["Body"].read(), bytes(fileName, "utf-8")) + self.assertEqual(lastModified, response["LastModified"]) + + def test_listInitTimes(self) -> None: # Create mock folders/files in the raw directory self.testS3.put_object(