Skip to content

Commit

Permalink
stream .nc not in buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
JordanLaserGit committed Dec 29, 2023
1 parent 898b770 commit 9c1a733
Showing 1 changed file with 21 additions and 13 deletions.
34 changes: 21 additions & 13 deletions forcingprocessor/src/forcingprocessor/forcingprocessor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pandas as pd
import argparse, os, json, sys
import fsspec
import requests
import s3fs
import gcsfs
from pathlib import Path
Expand Down Expand Up @@ -137,7 +137,7 @@ def forcing_grid2catchment(crosswalk_dict: dict, nwm_files: list, var_list: list
wgt_file: a path to the weights json,
filelist: list of filenames (urls for remote, local paths otherwise),
var_list: list (list of variable names to read),
jt: the index to place the file. This is used to ensure elements increase in time, regardless of thread number,
fs: an optional file system for cloud storage reads
Outputs:
df_by_t : (returned for local files) a list (in time) of forcing data. Note that this list may not be consistent in time
Expand All @@ -151,24 +151,25 @@ def forcing_grid2catchment(crosswalk_dict: dict, nwm_files: list, var_list: list
data_list = []
t_list = []
nfiles = len(nwm_files)
if nwm_files[0].find('googleapis') >= 0:
fs = gcsfs.GCSFileSystem()
fs_type = 'google'
elif nwm_files[0].find('s3.amazon') >= 0:
fs_type = 's3'
if fs_type == 'google' : fs = gcsfs.GCSFileSystem()
id = os.getpid()
if ii_verbose: print(f'{id} extracting data from {nfiles} files',end=None,flush=True)
for j, nwm_file in enumerate(nwm_files):
t0 = time.perf_counter()
eng = "h5netcdf"
if fs is not None:
bucket_key = convert_url2key(nwm_file,fs_type)
if fs:
if nwm_file.find('https://') >= 0: bucket_key = convert_url2key(nwm_file,fs_type)
else: bucket_key = nwm_file
file_obj = fs.open(bucket_key, mode='rb')
elif 'https://' in nwm_file:
response = requests.get(nwm_file)
file_obj = BytesIO(response.content)
else:
file_obj = nwm_file

topen += time.perf_counter() - t0
t0 = time.perf_counter()

with xr.open_dataset(file_obj, engine=eng) as nwm_data:
txrds += time.perf_counter() - t0
t0 = time.perf_counter()
Expand Down Expand Up @@ -444,7 +445,7 @@ def prep_ngen_data(conf):
"VGRD_10maboveground",
"DLWRF_surface",
"APCP_surface",
"precip_rate", # BROKEN (Identical to APCP!)
"precip_rate", # HACK RAINRATE * 3600
"TMP_2maboveground",
"SPFH_2maboveground",
"PRES_surface",
Expand Down Expand Up @@ -557,17 +558,19 @@ def prep_ngen_data(conf):

nfiles = len(nwm_forcing_files)

if nwm_forcing_files[0].find('s3.amazonaws') >= 0:
global fs_type
if 's3://' in nwm_forcing_files[0] or 's3.amazonaws' in nwm_forcing_files[0]:
fs = s3fs.S3FileSystem(
anon=True,
client_kwargs={'region_name': 'us-east-1'}
)
fs_type = 's3'
elif nwm_forcing_files[0].find('storage.googleapis') >= 0:
elif 'google' in nwm_forcing_files[0] or 'gs://' in nwm_forcing_files[0] or 'gcs://' in nwm_forcing_files[0]:
fs = "google"
fs_type = 'google'
else:
fs = None
fs_type = None

if ii_verbose:
print(f"NWM file names:")
Expand All @@ -593,6 +596,7 @@ def prep_ngen_data(conf):
jnwm_files = nwm_forcing_files[start:end]
t0 = time.perf_counter()
if ii_verbose: print(f'Entering data extraction...\n')
# [data_array, t_ax] = forcing_grid2catchment(crosswalk_dict, jnwm_files, var_list, fs)
data_array, t_ax = multiprocess_data_extract(jnwm_files,proc_threads,crosswalk_dict,var_list,fs)
t_extract = time.perf_counter() - t0
complexity = (nfiles_tot * ncatchments) / 10000
Expand Down Expand Up @@ -624,10 +628,14 @@ def prep_ngen_data(conf):
for j, jfile in enumerate(nwm_forcing_files):
if j > 10: break
if fs:
bucket_key = convert_url2key(jfile, fs_type)
if jfile.find('https://') >= 0: bucket_key = convert_url2key(jfile, fs_type)
else: bucket_key = jfile
if fs_type == 'google': fs = gcsfs.GCSFileSystem()

response = fs.open(bucket_key, mode='rb')
nwm_file_sizes.append(response.details['size'])
elif jfile.find('https://') >= 0:
nwm_file_sizes = len(requests.get(jfile).content)
else:
nwm_file_sizes = os.path.getsize(jfile)

Expand Down

0 comments on commit 9c1a733

Please sign in to comment.