Skip to content

Commit

Permalink
fix(sat-etl): Existing ds fix
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Dec 25, 2024
1 parent 99a0059 commit af336cb
Showing 1 changed file with 22 additions and 24 deletions.
46 changes: 22 additions & 24 deletions containers/sat/download_process_sat.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from ocf_blosc2 import Blosc2
from satpy import Scene
from tqdm import tqdm
import zarr

if sys.stdout.isatty():
# Simple logging for terminals
Expand All @@ -52,6 +53,7 @@
datefmt="%Y-%m-%dT%H:%M:%S",
)

# Reduce verbosity of dependacies
for logger in [
"cfgrib",
"charset_normalizer",
Expand All @@ -64,6 +66,7 @@
"urllib3",
]:
logging.getLogger(logger).setLevel(logging.ERROR)
np.seterr(divide="ignore")

log = logging.getLogger("sat-etl")

Expand Down Expand Up @@ -293,22 +296,21 @@ def write_to_zarr(
},
}
try:
with np.errstate(divide="ignore"):
write_job = da.chunk({
"time": 1,
"x_geostationary": -1,
"y_geostationary": -1,
"variable": 1,
}).to_dataset(
name="data",
promote_attrs=True,
).to_zarr(
store=zarr_path,
compute=True,
consolidated=True,
mode=mode,
**extra_kwargs,
)
write_job = da.chunk({
"time": 1,
"x_geostationary": -1,
"y_geostationary": -1,
"variable": 1,
}).to_dataset(
name="data",
promote_attrs=True,
).to_zarr(
store=zarr_path,
compute=True,
consolidated=True,
mode=mode,
**extra_kwargs,
)
except Exception as e:
log.error(f"Error writing dataset to zarr store {zarr_path} with mode {mode}: {e}")
traceback.print_tb(e.__traceback__)
Expand Down Expand Up @@ -802,7 +804,7 @@ def _calc_null_percentage(data: np.ndarray):
dask="parallelized",
)

num_images_failing_nulls_threshold = (result > 0.05).sum().item()
num_images_failing_nulls_threshold = (result > 0.05).sum().values
num_images = result.size
log.info(
f"{num_images_failing_nulls_threshold}/{num_images} "
Expand Down Expand Up @@ -830,7 +832,7 @@ def run(args: argparse.Namespace) -> None:
)

# Use existing zarr store if it exists
ds: xr.Dataset | None
ds: xr.Dataset | None = None
zarr_path = folder / start.strftime(sat_config.zarr_fmtstr[dstype])
if zarr_path.exists():
log.info(f"Using existing zarr store at '{zarr_path}'")
Expand All @@ -856,13 +858,10 @@ def run(args: argparse.Namespace) -> None:
if nat_filepath is None:
raise OSError(f"Failed to download product '{product}'")
da = process_nat(sat_config, nat_filepath, dstype)
write_to_zarr(
da=da,
zarr_path=folder / start.strftime(sat_config.zarr_fmtstr[dstype]),
)
write_to_zarr(da=da, zarr_path=zarr_path)

runtime = dt.datetime.now(tz=dt.UTC) - prog_start
log.info("Completed archive for args: {args} in {runtime!s}.")
log.info(f"Completed archive for args: {args} in {runtime!s}.")

# Download data
# We only parallelize if we have a number of files larger than the cpu count
Expand Down Expand Up @@ -904,7 +903,6 @@ def run(args: argparse.Namespace) -> None:
#log.info(f"Completed archive for args: {args}. ({new_average_secs_per_scan} seconds per scan).")

if args.validate:
zarr_path: pathlib.Path = folder.parent / start.strftime(sat_config.zarr_fmtstr[dstype])
ds = xr.open_zarr(zarr_path, consolidated=True)
check_data_quality(ds)

Expand Down

0 comments on commit af336cb

Please sign in to comment.