From 1b355a77b9452fe635d21a9ce91b33c60c88506f Mon Sep 17 00:00:00 2001 From: PennyHow Date: Tue, 4 Jun 2024 11:48:54 -0100 Subject: [PATCH] hard merging added --- src/pypromice/process/aws.py | 71 ++++++++++++++++++++++++++++-------- 1 file changed, 56 insertions(+), 15 deletions(-) diff --git a/src/pypromice/process/aws.py b/src/pypromice/process/aws.py index 1998a584..2ce64de2 100644 --- a/src/pypromice/process/aws.py +++ b/src/pypromice/process/aws.py @@ -76,6 +76,7 @@ def process(self): logger.info(f'Commencing {self.L0[0].attrs["number_of_booms"]}-boom processing...') self.getL1() self.getL2() + self.getL3() def write(self, outpath): '''Write L3 data to .csv and .nc file''' @@ -90,25 +91,18 @@ def getL1(self): logger.info('Level 1 processing...') self.L0 = [addBasicMeta(item, self.vars) for item in self.L0] self.L1 = [toL1(item, self.vars) for item in self.L0] - self.L1A = reduce(xr.Dataset.combine_first, self.L1) + + if self.merge_flag: + self.L1A = self.hard_merge(self.L1) + else: + self.L1A = reduce(xr.Dataset.combine_first, self.L1) def getL2(self): '''Perform L1 to L2 data processing''' logger.info('Level 2 processing...') self.L2 = toL2(self.L1A, vars_df=self.vars) - - # Resample L2 product - f = [l.attrs['format'] for l in self.L0] - if 'raw' in f or 'STM' in f: - logger.info('Resampling to 10 minute') - self.L2 = resampleL2(self.L2, '10min') - else: - self.L2 = resampleL2(self.L2, '60min') - logger.info('Resampling to hour') - - # Re-format time - t = self.L2['time'].values - self.L2['time'] = list(t) + self.L2 = self.resample(self.L2) + self.L2 = reformat_time(self.L2) # Switch gps_lon to negative (degrees_east) # Do this here, and NOT in addMeta, otherwise we switch back to positive @@ -122,13 +116,54 @@ def getL2(self): # Round all values to specified decimals places self.L2 = roundValues(self.L2, self.vars) - def getL3(self): '''Perform L2 to L3 data processing, including resampling and metadata and attribute population''' logger.info('Level 3 processing...') self.L3 = toL3(self.L2) + def resample(self, dataset): + '''Resample dataset to specific temporal resolution (based on input + data type)''' + f = [l.attrs['format'] for l in self.L0] + if 'raw' in f or 'STM' in f: + logger.info('Resampling to 10 minute') + resampled = resampleL2(dataset, '10min') + else: + resampled = resampleL2(dataset, '60min') + logger.info('Resampling to hour') + return resampled + + def merge_flag(self): + '''Determine if hard merging is needed, based on whether a hard + merge_type flag is defined in any of the configs''' + f = [l.attrs['merge_type'] for l in self.L0] + if 'hard' in f: + return True + else: + return False + + def hard_merge(self, dataset_list): + '''Determine positions where hard merging should occur, combine + data and append to list of combined data chunks, then hard merge all + combined data chunks. This should be called in instances where there + needs to be a clear break between input datasets, such as when a station + is moved (and we do not want the GPS position jumping)''' + # Define positions where hard merging should occur + m=[] + f = [l.attrs['merge_type'] for l in self.L0] + [m.append(i) for i, item in enumerate(f) if item=='hard'] + + # Perform combine between hard merge breaks and append to list of combined data + combined=[] + for i in range(len(m[:-1])): + combined.append(reduce(xr.Dataset.combine_first, dataset_list[m[i]:m[i+1]])) + combined.append(reduce(xr.Dataset.combine_first, dataset_list[m[-1]:])) + + # Hard merge all combined datasets together + return reduce(xr.Dataset.update, combined) + + def addAttributes(self, L3): '''Add variable and attribute metadata @@ -365,6 +400,12 @@ def getL0(infile, nodata, cols, skiprows, file_version, ds = xr.Dataset.from_dataframe(df) return ds +def reformat_time(dataset): + '''Re-format time''' + t = dataset['time'].values + dataset['time'] = list(t) + return dataset + def addBasicMeta(ds, vars_df): ''' Use a variable lookup table DataFrame to add the basic metadata to the xarray dataset. This is later amended to finalise L3