Skip to content

Commit

Permalink
revert back to old s3 file
Browse files Browse the repository at this point in the history
  • Loading branch information
peterdudfield committed Jan 10, 2025
1 parent bfee46b commit 5a8956b
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 25 deletions.
36 changes: 12 additions & 24 deletions terraform/modules/services/airflow/dags/utils/s3.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.decorators import task

import xarray as xr

@task(task_id="determine_latest_zarr")
def determine_latest_zarr(bucket: str, prefix: str):
s3hook = S3Hook(aws_conn_id=None) # Use Boto3 default connection strategy
Expand All @@ -26,42 +24,32 @@ def determine_latest_zarr(bucket: str, prefix: str):
size_old += obj.size

# If the sizes are different, create a new latest.zarr
s3hook.log.info(f"size_old={size_old}, size_new={size_new}")
if size_old != size_new and size_new > 500 * 1e3: # Expecting at least 500KB

# open file
s3hook.log.info(f"Opening {zarrs[0]}")
ds = xr.open_zarr(f"s3://{bucket}/{prefix}/{zarrs[0]}")

# re-chunk
s3hook.log.info("Re-chunking")
ds = ds.chunk({"init_time": 1,
"step": len(ds.step) // 4,
"variable": len(ds.variable),
"latitude": len(ds.latitude) // 2,
"longitude": len(ds.longitude) // 2})

# save to latest_temp.zarr
s3hook.log.info(f"Saving {prefix}/latest_temp.zarr/")
ds.to_zarr(f"s3://{bucket}/{prefix}/latest_temp.zarr/", mode="w")

# delete latest.zarr
s3hook.log.info(f"Deleting {prefix}/latest.zarr/")
if prefix + "/latest.zarr/" in prefixes:
s3hook.log.debug(f"Deleting {prefix}/latest.zarr/")
keys_to_delete = s3hook.list_keys(bucket_name=bucket, prefix=prefix + "/latest.zarr/")
s3hook.delete_objects(bucket=bucket, keys=keys_to_delete)

# move latest_temp.zarr to latest.zarr
s3hook.log.info(f"Move {prefix}/latest_temp.zarr/ to {prefix}/latest.zarr/")
keys_to_move = s3hook.list_keys(bucket_name=bucket, prefix=prefix + "/latest_temp.zarr/")
for key in keys_to_move:
#

# move latest zarr file to latest.zarr using s3 batch jobs
s3hook.log.info(f"Creating {prefix}/latest.zarr/")

# Copy the new latest.zarr
s3hook.log.info(f"Copying {zarrs[0]} to {prefix}/latest.zarr/")
source_keys = s3hook.list_keys(bucket_name=bucket, prefix=zarrs[0])
for key in source_keys:
s3hook.copy_object(
source_bucket_name=bucket,
source_bucket_key=key,
dest_bucket_name=bucket,
dest_bucket_key=prefix + "/latest.zarr/" + key.split(prefix + "/latest_temp.zarr/")[-1],
dest_bucket_key=prefix + "/latest.zarr/" + key.split(zarrs[0])[-1],
)
s3hook.delete_objects(bucket=bucket, keys=keys_to_move)
s3hook.delete_objects(bucket=bucket, keys=source_keys)

else:
s3hook.log.info("No changes to latest.zarr required")
Expand Down
1 change: 0 additions & 1 deletion terraform/modules/services/airflow/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ services:
ECS_SECURITY_GROUP: $ECS_SECURITY_GROUP
_AIRFLOW_WWW_USER_PASSWORD: ${PASSWORD}
AIRFLOW_CONN_SLACK_API_DEFAULT: ${AIRFLOW_CONN_SLACK_API_DEFAULT}
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- xarray }

user: "${AIRFLOW_UID:-50000}:0"
volumes:
Expand Down

0 comments on commit 5a8956b

Please sign in to comment.