MPAS-JEDI
DiagnoseObsStatistics.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 import DiagnoseObsStatisticsArgs
4 
5 import binning_utils as bu
6 import predefined_configs as pconf
7 import config as conf
8 from copy import deepcopy
9 import diag_utils as du
10 import fnmatch
11 import glob
12 from JediDB import JediDB
13 from JediDBArgs import obsFKey
14 import logging
15 import logsetup
16 import multiprocessing as mp
17 from netCDF4 import Dataset
18 import numpy as np
19 import os
20 import stat_utils as su
21 import var_utils as vu
22 
23 _logger = logging.getLogger(__name__)
24 
26  '''
27  Diagnose observation-space statistics
28  Driven by
29  - static selections in conf
30  - command-line arguments in DiagnoseObsStatisticsArgs
31  '''
32  def __init__(self):
33  self.namename = 'DiagnoseObsStatistics'
34  self.argsargs = DiagnoseObsStatisticsArgs.args
35  self.loggerlogger = logging.getLogger(self.namename)
36 
37  # construct mean DB into 0th member slot
38  self.loggerlogger.info('mean database: '+self.argsargs.meanPath)
39  self.jdbsjdbs = {vu.mean: JediDB(self.argsargs.meanPath)}
40  self.osKeysosKeys = sorted(self.jdbsjdbs[vu.mean].Files.keys())
41 
42  # construct ens member DBs into subsequent slots (when available)
43  for member in list(range(1, self.argsargs.nMembers+1)):
44  ensemblePath = str(self.argsargs.ensemblePath).format(member)
45  self.loggerlogger.info('adding member database: '+ensemblePath)
46  self.jdbsjdbs[vu.ensSuffix(member)] = JediDB(ensemblePath)
47 
48  def diagnose(self, workers = None):
49  '''
50  conducts diagnoseObsSpace across multiple ObsSpaces in parallel
51  '''
52  # Loop over all experiment+observation combinations (keys) alphabetically
53  for osKey in self.osKeysosKeys:
54  self.loggerlogger.info(osKey)
55  if workers is None:
56  self.diagnoseObsSpacediagnoseObsSpacediagnoseObsSpace(self.jdbsjdbs, osKey)
57  else:
58  res = workers.apply_async(self.diagnoseObsSpacediagnoseObsSpacediagnoseObsSpace, args=(self.jdbsjdbs, osKey))
59 
60  def diagnoseObsSpace(self, jdbs, osKey):
61  # osKey - key of jdbs members to reference
62  logger = logging.getLogger(self.namename+'.diagnoseObsSpace('+osKey+')')
63  nMembers = len(jdbs)-1
64 
65  # initialize mean db file handles
66  jdbs[vu.mean].initHandles(osKey)
67 
68  ###############################################
69  ## Extract constructor info about the ObsSpace
70  ###############################################
71 
72  ObsSpaceName = jdbs[vu.mean].ObsSpaceName[osKey]
73  ObsSpaceInfo = conf.DiagSpaceConfig[ObsSpaceName]
74  ObsSpaceGrp = ObsSpaceInfo['DiagSpaceGrp']
75  binVarConfigs = ObsSpaceInfo.get('binVarConfigs',{})
76  selectDiagNames = ObsSpaceInfo.get('diagNames',{})
77 
78  # create observed variable list by selecting those variables in the
79  # obs feedback files (obsFKey) with the proper suffix
80  if self.argsargs.jediAppName == 'variational':
81  markerGroup = vu.depbgGroup
82  elif self.argsargs.jediAppName == 'hofx':
83  markerGroup = vu.hofxGroup
84  else:
85  logger.error('JEDI Application is not supported:: '+self.argsargs.jediAppName)
86  obsVars = jdbs[vu.mean].varList(osKey, obsFKey, markerGroup)
87 
88  ########################################################
89  ## Construct dictionary of binMethods for this ObsSpace
90  ########################################################
91 
92  binMethods = {}
93 
94  for binVarKey, binMethodKeys in binVarConfigs.items():
95  binVarConfig = pconf.binVarConfigs.get(binVarKey,pconf.nullBinVarConfig)
96  for binMethodKey in binMethodKeys:
97  config = binVarConfig.get(binMethodKey,pconf.nullBinMethod).copy()
98 
99  if (len(config['values']) < 1 or
100  len(config['filters']) < 1): continue
101 
102  config['osName'] = ObsSpaceName
103  config['fileFormat'] = jdbs[vu.mean].fileFormat(osKey, obsFKey)
104 
105  binMethods[(binVarKey,binMethodKey)] = bu.BinMethod(config)
106 
107 
108  ######################################
109  ## Construct diagnostic configurations
110  ######################################
111 
112  diagnosticConfigs = du.diagnosticConfigs(
113  selectDiagNames, ObsSpaceName,
114  includeEnsembleDiagnostics = (nMembers > 1),
115  fileFormat = jdbs[vu.mean].fileFormat(osKey, obsFKey))
116 
117 
118  #####################################################
119  ## Generate comprehensive dict of required variables
120  #####################################################
121 
122  meanDBVars = []
123  ensDBVars = []
124  dbVars = {vu.mean: [], vu.ensemble: []}
125  for varName in obsVars:
126  for diagName, diagnosticConfig in diagnosticConfigs.items():
127  if 'ObsFunction' not in diagnosticConfig: continue
128 
129  # variables for diagnostics
130  for grpVar in diagnosticConfig['ObsFunction'].dbVars(
131  varName, diagnosticConfig['outerIter']):
132  for memberType in dbVars.keys():
133  if diagnosticConfig[memberType]:
134  dbVars[memberType].append(grpVar)
135 
136  # variables for binning
137  # TODO: anIter grpVar's are not needed for all applications
138  # can save some reading time+memory by checking all diagnosticConfigs
139  # for required iterations before appending to dbVars[vu.mean] below
140  for (binVarKey,binMethodKey), binMethod in binMethods.items():
141  for grpVar in binMethod.dbVars(
142  varName, diagnosticConfig['outerIter']):
143  dbVars[vu.mean].append(grpVar)
144 
145 
146  #####################################
147  ## Read required variables from jdbs
148  #####################################
149 
150  # read mean database variable values into memory
151  dbVals = jdbs[vu.mean].readVars(osKey, dbVars[vu.mean])
152 
153  # destroy mean file handles
154  jdbs[vu.mean].destroyHandles(osKey)
155 
156  # now for ensemble members
157  for memStr, jdb in jdbs.items():
158  if memStr == vu.mean: continue
159 
160  # initialize member db file handles
161  jdb.initHandles(osKey)
162 
163  # read database variable values into memory
164  memberDBVals = jdb.readVars(osKey, dbVars[vu.ensemble])
165  for dbVar, vals in memberDBVals.items():
166  dbVals[dbVar+memStr] = vals.copy()
167 
168  # destroy file handles
169  jdb.destroyHandles(osKey)
170 
171 
172  ######################################
173  ## Collect statistics for all obsVars
174  ######################################
175 
176  # Initialize a dictionary to contain all statistical info for this osKey
177  statsDict = {}
178  for attribName in su.fileStatAttributes:
179  statsDict[attribName] = []
180  for statName in su.allFileStats:
181  statsDict[statName] = []
182 
183  # collect stats for all diagnosticConfigs
184  for diagName, diagnosticConfig in sorted(diagnosticConfigs.items()):
185  if 'ObsFunction' not in diagnosticConfig: continue
186 
187  logger.info('Calculating/writing diagnostic stats for:')
188  logger.info('DIAG = '+diagName)
189  Diagnostic = diagnosticConfig['ObsFunction']
190  outerIter = diagnosticConfig['outerIter']
191 
192  for varName in obsVars:
193  logger.info('VARIABLE = '+varName)
194 
195  varShort, varUnits = vu.varAttributes(varName)
196 
197  Diagnostic.evaluate(dbVals, varName, outerIter)
198  diagValues = Diagnostic.result
199 
200  if len(diagValues)-np.isnan(diagValues).sum() == 0:
201  logger.warning('All missing values for diagnostic: '+diagName)
202 
203  for (binVarKey,binMethodKey), binMethod in binMethods.items():
204  if diagName in binMethod.excludeDiags: continue
205 
206  binVarName, binGrpName = vu.splitObsVarGrp(binVarKey)
207  binVarShort, binVarUnits = vu.varAttributes(binVarName)
208 
209  # initialize binMethod filter function result
210  # NOTE: binning is performed using mean values
211  # and not ensemble member values
212  binMethod.evaluate(dbVals, varName, outerIter)
213 
214  for binVal in binMethod.values:
215  # apply binMethod filters for binVal
216  binnedDiagnostic = binMethod.apply(diagValues,diagName,binVal)
217 
218  # store value and statistics associated with this bin
219  statsDict['binVal'].append(binVal)
220  statsVal = su.calcStats(binnedDiagnostic)
221  for statName in su.allFileStats:
222  statsDict[statName].append(statsVal[statName])
223 
224  # store metadata common to all bins
225  statsDict['DiagSpaceGrp'].append(ObsSpaceGrp)
226  statsDict['varName'].append(varShort)
227  statsDict['varUnits'].append(varUnits)
228  statsDict['diagName'].append(diagName)
229  statsDict['binMethod'].append(binMethodKey)
230  statsDict['binVar'].append(binVarShort)
231  statsDict['binUnits'].append(binVarUnits)
232 
233  #END binMethod.values LOOP
234  #END binMethods tuple LOOP
235  #END obsVars LOOP
236  #END diagnosticConfigs LOOP
237 
238  ## Create a new stats file for osKey
239  logger.info('Writing statistics file')
240  su.write_stats_nc(osKey,statsDict)
241 
242  logger.info('Finished')
243 
244 #=========================================================================
245 # main program
246 def main():
247  _logger.info('Starting '+__name__)
248 
249  statistics = DiagnoseObsStatistics()
250 
251  if statistics.args.nprocs == 1:
252  statistics.diagnose()
253  else:
254  # create pool of workers
255  workers = mp.Pool(processes = statistics.args.nprocs)
256 
257  # diagnose statistics
258  statistics.diagnose(workers)
259 
260  # wait for workers to finish
261  workers.close()
262  workers.join()
263 
264  _logger.info('Finished '+__name__+' successfully')
265 
266 if __name__ == '__main__': main()