Skip to content

Commit

Permalink
hard merging added
Browse files Browse the repository at this point in the history
  • Loading branch information
PennyHow committed Jun 4, 2024
1 parent 59bf8dd commit 1b355a7
Showing 1 changed file with 56 additions and 15 deletions.
71 changes: 56 additions & 15 deletions src/pypromice/process/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def process(self):
logger.info(f'Commencing {self.L0[0].attrs["number_of_booms"]}-boom processing...')
self.getL1()
self.getL2()
self.getL3()

def write(self, outpath):
'''Write L3 data to .csv and .nc file'''
Expand All @@ -90,25 +91,18 @@ def getL1(self):
logger.info('Level 1 processing...')
self.L0 = [addBasicMeta(item, self.vars) for item in self.L0]
self.L1 = [toL1(item, self.vars) for item in self.L0]
self.L1A = reduce(xr.Dataset.combine_first, self.L1)

if self.merge_flag:
self.L1A = self.hard_merge(self.L1)
else:
self.L1A = reduce(xr.Dataset.combine_first, self.L1)

def getL2(self):
'''Perform L1 to L2 data processing'''
logger.info('Level 2 processing...')
self.L2 = toL2(self.L1A, vars_df=self.vars)

# Resample L2 product
f = [l.attrs['format'] for l in self.L0]
if 'raw' in f or 'STM' in f:
logger.info('Resampling to 10 minute')
self.L2 = resampleL2(self.L2, '10min')
else:
self.L2 = resampleL2(self.L2, '60min')
logger.info('Resampling to hour')

# Re-format time
t = self.L2['time'].values
self.L2['time'] = list(t)
self.L2 = self.resample(self.L2)
self.L2 = reformat_time(self.L2)

# Switch gps_lon to negative (degrees_east)
# Do this here, and NOT in addMeta, otherwise we switch back to positive
Expand All @@ -122,13 +116,54 @@ def getL2(self):
# Round all values to specified decimals places
self.L2 = roundValues(self.L2, self.vars)


def getL3(self):
'''Perform L2 to L3 data processing, including resampling and metadata
and attribute population'''
logger.info('Level 3 processing...')
self.L3 = toL3(self.L2)

def resample(self, dataset):
'''Resample dataset to specific temporal resolution (based on input
data type)'''
f = [l.attrs['format'] for l in self.L0]
if 'raw' in f or 'STM' in f:
logger.info('Resampling to 10 minute')
resampled = resampleL2(dataset, '10min')
else:
resampled = resampleL2(dataset, '60min')
logger.info('Resampling to hour')
return resampled

def merge_flag(self):
'''Determine if hard merging is needed, based on whether a hard
merge_type flag is defined in any of the configs'''
f = [l.attrs['merge_type'] for l in self.L0]
if 'hard' in f:
return True
else:
return False

def hard_merge(self, dataset_list):
'''Determine positions where hard merging should occur, combine
data and append to list of combined data chunks, then hard merge all
combined data chunks. This should be called in instances where there
needs to be a clear break between input datasets, such as when a station
is moved (and we do not want the GPS position jumping)'''
# Define positions where hard merging should occur
m=[]
f = [l.attrs['merge_type'] for l in self.L0]
[m.append(i) for i, item in enumerate(f) if item=='hard']

# Perform combine between hard merge breaks and append to list of combined data
combined=[]
for i in range(len(m[:-1])):
combined.append(reduce(xr.Dataset.combine_first, dataset_list[m[i]:m[i+1]]))
combined.append(reduce(xr.Dataset.combine_first, dataset_list[m[-1]:]))

# Hard merge all combined datasets together
return reduce(xr.Dataset.update, combined)


def addAttributes(self, L3):
'''Add variable and attribute metadata
Expand Down Expand Up @@ -365,6 +400,12 @@ def getL0(infile, nodata, cols, skiprows, file_version,
ds = xr.Dataset.from_dataframe(df)
return ds

def reformat_time(dataset):
'''Re-format time'''
t = dataset['time'].values
dataset['time'] = list(t)
return dataset

def addBasicMeta(ds, vars_df):
''' Use a variable lookup table DataFrame to add the basic metadata
to the xarray dataset. This is later amended to finalise L3
Expand Down

0 comments on commit 1b355a7

Please sign in to comment.