Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding zlib compression to netcdf #301

Merged
merged 7 commits into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 0 additions & 38 deletions src/pypromice/process/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,22 +123,6 @@ def process(self):
self.getL2()
self.getL3()

def writeL2(self, outpath):
"""Write L2 data to .csv and .nc file"""
if os.path.isdir(outpath):
self.writeArr(self.L2, outpath)
else:
logger.info(f"Outpath f{outpath} does not exist. Unable to save to file")
pass

def writeL3(self, outpath):
"""Write L3 data to .csv and .nc file"""
if os.path.isdir(outpath):
self.writeArr(self.L3, outpath)
else:
logger.info(f"Outpath f{outpath} does not exist. Unable to save to file")
pass

def getL1(self):
"""Perform L0 to L1 data processing"""
logger.info("Level 1 processing...")
Expand All @@ -164,28 +148,6 @@ def getL3(self):
logger.info("Level 3 processing...")
self.L3 = toL3(self.L2, data_adjustments_dir=self.data_issues_repository / "adjustments")

def writeArr(self, dataset, outpath, t=None):
"""Write L3 data to .nc and .csv hourly and daily files

Parameters
----------
dataset : xarray.Dataset
Dataset to write to file
outpath : str
Output directory
t : str
Resampling string. This is automatically defined based
on the data type if not given. The default is None.
"""
if t is not None:
write.prepare_and_write(dataset, outpath, self.vars, self.meta, t)
else:
f = [l.attrs["format"] for l in self.L0]
if "raw" in f or "STM" in f:
write.prepare_and_write(dataset, outpath, self.vars, self.meta, "10min")
else:
write.prepare_and_write(dataset, outpath, self.vars, self.meta, "60min")

def loadConfig(self, config_file, inpath):
"""Load configuration from .toml file

Expand Down
6 changes: 3 additions & 3 deletions src/pypromice/process/join_l3.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,9 +538,9 @@ def join_l3(config_folder, site, folder_l3, folder_gcnet, outpath, variables, me
v = pypromice.resources.load_variables(variables)
m = pypromice.resources.load_metadata(metadata)
if outpath is not None:
prepare_and_write(l3_merged, outpath, v, m, "60min")
prepare_and_write(l3_merged, outpath, v, m, "1D")
prepare_and_write(l3_merged, outpath, v, m, "M")
prepare_and_write(l3_merged, outpath, v, m, "60min", nc_compression=True)
prepare_and_write(l3_merged, outpath, v, m, "1D", nc_compression=True)
prepare_and_write(l3_merged, outpath, v, m, "M", nc_compression=True)
return l3_merged, sorted_list_station_data


Expand Down
52 changes: 18 additions & 34 deletions src/pypromice/process/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@


def prepare_and_write(
dataset, output_path: Path | str, vars_df=None, meta_dict=None, time="60min", resample=True
dataset,
output_path: Path | str,
vars_df=None,
meta_dict=None,
time="60min",
resample=True,
nc_compression:bool=False,
):
"""Prepare data with resampling, formating and metadata population; then
write data to .nc and .csv hourly and daily files
Expand Down Expand Up @@ -117,40 +123,11 @@ def prepare_and_write(
writeCSV(out_csv, d2, col_names)

# Write to netcdf file
writeNC(out_nc, d2, col_names)
writeNC(out_nc, d2, col_names, compression=nc_compression)
logger.info(f"Written to {out_csv}")
logger.info(f"Written to {out_nc}")


def writeAll(outpath, station_id, l3_h, l3_d, l3_m, csv_order=None):
"""Write L3 hourly, daily and monthly datasets to .nc and .csv
files

Parameters
----------
outpath : str
Output file path
station_id : str
Station name
l3_h : xr.Dataset
L3 hourly data
l3_d : xr.Dataset
L3 daily data
l3_m : xr.Dataset
L3 monthly data
csv_order : list, optional
List order of variables
"""
if not os.path.isdir(outpath):
os.mkdir(outpath)
outfile_h = os.path.join(outpath, station_id + "_hour")
outfile_d = os.path.join(outpath, station_id + "_day")
outfile_m = os.path.join(outpath, station_id + "_month")
for o, l in zip([outfile_h, outfile_d, outfile_m], [l3_h, l3_d, l3_m]):
writeCSV(o + ".csv", l, csv_order)
writeNC(o + ".nc", l)


def writeCSV(outfile, Lx, csv_order):
"""Write data product to CSV file

Expand All @@ -170,8 +147,8 @@ def writeCSV(outfile, Lx, csv_order):
Lcsv.to_csv(outfile)


def writeNC(outfile, Lx, col_names=None):
"""Write data product to NetCDF file
def writeNC(outfile, Lx, col_names=None, compression=False):
"""Write data product to NetCDF file with compression

Parameters
----------
Expand All @@ -187,7 +164,14 @@ def writeNC(outfile, Lx, col_names=None):
else:
names = list(Lx.keys())

Lx[names].to_netcdf(outfile, mode="w", format="NETCDF4", compute=True)
encoding = {var: dict() for var in names}

if compression:
comp = dict(zlib=True, complevel=4)
for var in names:
encoding[var].update(comp)

Lx[names].to_netcdf(outfile, mode="w", format="NETCDF4", compute=True, encoding=encoding)


def getColNames(vars_df, ds, remove_nan_fields=False):
Expand Down
24 changes: 24 additions & 0 deletions tests/e2e/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,30 @@ def test_full_e2e(self):
output_dataset = xr.load_dataset(output_path)
self.check_global_attributes(output_dataset, output_rel_path)

# Check if the l3 datasets are compressed
if output_path.parent.parent.name == 'site_l3':
self.assertEqual(output_dataset['p_u'].encoding["zlib"], True, output_rel_path)
else:
self.assertEqual(output_dataset['p_u'].encoding["zlib"], False, output_rel_path)

# Test if the l3 output netcdf files are compressed with zlib
for output_rel_path in [
"station_l3/TEST1/TEST1_day.nc",
"station_l3/TEST1/TEST1_hour.nc",
"station_l3/TEST1/TEST1_month.nc",
"site_l3/SITE_01/SITE_01_day.nc",
"site_l3/SITE_01/SITE_01_hour.nc",
"site_l3/SITE_01/SITE_01_month.nc",
]:
output_path = root / output_rel_path
output_dataset = xr.load_dataset(output_path)
for var in output_dataset.variables:
# %%
print(var, output_dataset[var].encoding)
continue
self.assertEqual(output_dataset[var].encoding["zlib"], True)


def check_global_attributes(self, dataset: xr.Dataset, reference: str):
attribute_keys = set(dataset.attrs.keys())
highly_recommended_global_attributes = {
Expand Down
Loading