Skip to content

Commit

Permalink
feat(sat-etl): Skip already present entries
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Dec 25, 2024
1 parent a023cae commit 5c5b07f
Showing 1 changed file with 35 additions and 17 deletions.
52 changes: 35 additions & 17 deletions containers/sat/download_process_sat.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@
"requests",
"satpy",
"urllib3",
"dask",
"dask.core",
]:
logging.getLogger(logger).setLevel(logging.ERROR)

Expand Down Expand Up @@ -295,21 +293,22 @@ def write_to_zarr(
},
}
try:
write_job = da.chunk({
"time": 1,
"x_geostationary": -1,
"y_geostationary": -1,
"variable": 1,
}).to_dataset(
name="data",
promote_attrs=True,
).to_zarr(
store=zarr_path,
compute=True,
consolidated=True,
mode=mode,
**extra_kwargs,
)
with numpy.errstate(divide="ignore"):
write_job = da.chunk({
"time": 1,
"x_geostationary": -1,
"y_geostationary": -1,
"variable": 1,
}).to_dataset(
name="data",
promote_attrs=True,
).to_zarr(
store=zarr_path,
compute=True,
consolidated=True,
mode=mode,
**extra_kwargs,
)
except Exception as e:
log.error(f"Error writing dataset to zarr store {zarr_path} with mode {mode}: {e}")
traceback.print_tb(e.__traceback__)
Expand Down Expand Up @@ -830,7 +829,26 @@ def run(args: argparse.Namespace) -> None:
token=_gen_token(),
)

# Use existing zarr store if it exists
ds: xr.Dataset | None
zarr_path = folder / start.strftime(sat_config.zarr_fmtstr[dstype])
if zarr_path.exists():
log.info(f"Using existing zarr store at '{zarr_path}'")
ds = xr.open_zarr(zarr_path, consolidated=True)

# Iterate through all products in search
for product in tqdm(product_iter, total=total, miniters=50):

# Skip products already present in store
if ds is not None and :
product_time: dt.datetime = product.sensing_start.replace(second=0, microsecond=0)
if np.timedelta64(product_time, "ns") in ds.coords["time"].values:
log.debug(
f"Skipping entry '{product!s}' as '{product_time}' already in store"
)
continue

# For non-existing products, download and process
nat_filepath = download_nat(
product=product,
folder=folder / args.sat,
Expand Down

0 comments on commit 5c5b07f

Please sign in to comment.