From c8f422a9b3d53e3a6e682ca786d5d373893c3fd6 Mon Sep 17 00:00:00 2001 From: Eli Date: Fri, 15 Nov 2024 17:00:51 -0800 Subject: [PATCH] Fixed downloading bug in USGS --- dms_datastore/download_nwis.py | 16 +++++++++++----- dms_datastore/inventory.py | 7 +++++-- dms_datastore/populate_repo.py | 13 +++++++------ dms_datastore/write_ts.py | 1 - 4 files changed, 23 insertions(+), 14 deletions(-) diff --git a/dms_datastore/download_nwis.py b/dms_datastore/download_nwis.py index 4db95d7..b92dd28 100644 --- a/dms_datastore/download_nwis.py +++ b/dms_datastore/download_nwis.py @@ -211,7 +211,7 @@ def parse_usgs_json(parseinput,outfile,report_empty=False): # subloc_yaml = yaml.dump(subloc_dict, default_flow_style=False) write_ts_csv(result_df,outfile,site_metadata,chunk_years=False) - return unique_qual + return result_df @@ -219,13 +219,17 @@ def parse_usgs_json(parseinput,outfile,report_empty=False): def download_station( row, dest_dir, start, end, param, overwrite, endfile, successes, failures, skips ): + agency_id = row.agency_id station = row.station_id param = row.src_var_id paramname = row.param subloc = row.subloc + if (station, paramname) in successes: return + + yearname = ( f"{start.year}_{endfile}" # if start.year != end.year else f"{start.year}" @@ -272,18 +276,20 @@ def download_station( station_html = response.read().decode().replace("\r", "") except: station_html = "" # Catches incomplete read error - if len(station_html) > 80 and not "No sites found matching" in station_html: + if len(station_html) > 120 and not "No sites found matching" in station_html or "\"timeSeries\":[]" in station_html: found = True - logger.info(f"Parsing USGS JSON: {path}") + logger.info(f"Parsing USGS JSON: {path} param {param}") try: - parse_usgs_json(station_html,path,report_empty=f"{station} {paramname} ({param})") + df = parse_usgs_json(station_html,path,report_empty=f"{station} {paramname} ({param})") except Exception as exc: logger.info(f"Parsing of {station} {paramname} ({param}) JSON to csv failed") with open(path, "w") as f: f.write(station_html) _quarantine_file(path) raise - successes.add((station, paramname)) + if df is not None and not df.empty: + found = True + successes.add((station, paramname)) if not found: logger.debug(f"Station {station} query failed or produced no data") if (station, paramname) not in failures: diff --git a/dms_datastore/inventory.py b/dms_datastore/inventory.py index a1f2076..d8bfa6c 100644 --- a/dms_datastore/inventory.py +++ b/dms_datastore/inventory.py @@ -157,6 +157,9 @@ def repo_data_inventory(fpath,full=True,by="file_pattern"): metadf = pd.DataFrame(allmeta) metadf['original_filename'] = metadf.filename metadf['filename'] = metadf.apply(lambda x: to_wildcard(x.filename,remove_source=True),axis=1) + metadf['source'] = metadf['agency'] + + metadf.loc[:,'agency'] = station_db.loc[metadf.station_id,'agency'].to_numpy() double_year_format = "syear" in metadf.columns #meta2 = metadf.groupby(["station_id","subloc","param"]).first() @@ -164,7 +167,7 @@ def repo_data_inventory(fpath,full=True,by="file_pattern"): # todo: is a groupby necessary for double year format? are there duplicates? grouped_meta = metadf.groupby(["station_id","subloc","param"],dropna=False).agg( { - "agency": lambda ser: reduce(prioritize_source,ser), + "agency": ['first'], "agency_id":['first'], "syear":['min'], "eyear":['max'], @@ -175,7 +178,7 @@ def repo_data_inventory(fpath,full=True,by="file_pattern"): else: grouped_meta = metadf.groupby(["station_id","subloc","param"],dropna=False).agg( { - "agency": lambda ser: reduce(prioritize_source,ser), + "agency": ['first'], "agency_id":['first'], "year":['min','max'], "filename": ['first'], diff --git a/dms_datastore/populate_repo.py b/dms_datastore/populate_repo.py index 2b067ae..6d0f864 100644 --- a/dms_datastore/populate_repo.py +++ b/dms_datastore/populate_repo.py @@ -438,7 +438,7 @@ def populate(dest, all_agencies=None, varlist=None, partial_update=False): for var in varlist: if not partial_update: logger.info( - f"Calling populate_repo (1) with agency {agency} variable: {var}" + f"Calling populate_repo (1) with agency {agency} variable: {var} start: 1980-01-01" ) populate_repo( agency, @@ -449,7 +449,7 @@ def populate(dest, all_agencies=None, varlist=None, partial_update=False): ignore_existing=ignore_existing, ) logger.info( - f"Calling populate_repo (2) with agency {agency} variable: {var}" + f"Calling populate_repo (2) with agency {agency} variable: {var} start: 2000-01-01" ) populate_repo( agency, @@ -460,9 +460,10 @@ def populate(dest, all_agencies=None, varlist=None, partial_update=False): ignore_existing=ignore_existing, ) logger.info( - f"Calling populate_repo (3) with agency {agency} variable: {var}" + f"Calling populate_repo (3) with agency {agency} variable: {var} start: 2020-01-01" ) end_download = pd.Timestamp(2039,12,31,23,59) if ((agency == "noaa") and (var == "predictions")) else None + populate_repo( agency, var, dest, pd.Timestamp(2020, 1, 1), end_download, overwrite=True ) @@ -471,11 +472,11 @@ def populate(dest, all_agencies=None, varlist=None, partial_update=False): os.path.join(dest, f"{agency}*_{var}_*.{ext}") ) logger.info(f"Done with agency {agency} variable: {var}") - print(f"Done with agency {agency} for all variables") + logger.info(f"Done with agency {agency} for all variables") doneagency.append(agency) - print("Completed population for these agencies: ") + logger.info("Completed population for these agencies: ") for agent in doneagency: - print(agent) + logger.info(agent) def purge(dest): diff --git a/dms_datastore/write_ts.py b/dms_datastore/write_ts.py index c226256..47a322a 100644 --- a/dms_datastore/write_ts.py +++ b/dms_datastore/write_ts.py @@ -128,7 +128,6 @@ def write_ts_csv(ts,fpath,metadata=None,chunk_years=False,format_version="dwr-dm pass #print(f"Year already in file name for file {newfname}") with open(newfname,'w',newline="\n") as outfile: - print(meta_header) outfile.write(meta_header) tssub.to_csv(outfile,header=True,sep=",",date_format="%Y-%m-%dT%H:%M:%S",**kwargs) else: # not chunk_years