3 import DiagnoseObsStatisticsArgs
5 import binning_utils
as bu
6 import predefined_configs
as pconf
8 from copy
import deepcopy
9 import diag_utils
as du
12 from JediDB
import JediDB
13 from JediDBArgs
import obsFKey
16 import multiprocessing
as mp
17 from netCDF4
import Dataset
20 import stat_utils
as su
21 import var_utils
as vu
23 _logger = logging.getLogger(__name__)
27 Diagnose observation-space statistics
29 - static selections in conf
30 - command-line arguments in DiagnoseObsStatisticsArgs
33 self.
namename =
'DiagnoseObsStatistics'
34 self.
argsargs = DiagnoseObsStatisticsArgs.args
35 self.
loggerlogger = logging.getLogger(self.
namename)
38 self.
loggerlogger.info(
'mean database: '+self.
argsargs.meanPath)
40 self.
osKeysosKeys = sorted(self.
jdbsjdbs[vu.mean].Files.keys())
43 for member
in list(range(1, self.
argsargs.nMembers+1)):
44 ensemblePath = str(self.
argsargs.ensemblePath).
format(member)
45 self.
loggerlogger.info(
'adding member database: '+ensemblePath)
46 self.
jdbsjdbs[vu.ensSuffix(member)] =
JediDB(ensemblePath)
50 conducts diagnoseObsSpace across multiple ObsSpaces in parallel
53 for osKey
in self.
osKeysosKeys:
54 self.
loggerlogger.info(osKey)
62 logger = logging.getLogger(self.
namename+
'.diagnoseObsSpace('+osKey+
')')
63 nMembers = len(jdbs)-1
66 jdbs[vu.mean].initHandles(osKey)
72 ObsSpaceName = jdbs[vu.mean].ObsSpaceName[osKey]
73 ObsSpaceInfo = conf.DiagSpaceConfig[ObsSpaceName]
74 ObsSpaceGrp = ObsSpaceInfo[
'DiagSpaceGrp']
75 binVarConfigs = ObsSpaceInfo.get(
'binVarConfigs',{})
76 selectDiagNames = ObsSpaceInfo.get(
'diagNames',{})
80 if self.
argsargs.jediAppName ==
'variational':
81 markerGroup = vu.depbgGroup
82 elif self.
argsargs.jediAppName ==
'hofx':
83 markerGroup = vu.hofxGroup
85 logger.error(
'JEDI Application is not supported:: '+self.
argsargs.jediAppName)
86 obsVars = jdbs[vu.mean].varList(osKey, obsFKey, markerGroup)
94 for binVarKey, binMethodKeys
in binVarConfigs.items():
95 binVarConfig = pconf.binVarConfigs.get(binVarKey,pconf.nullBinVarConfig)
96 for binMethodKey
in binMethodKeys:
97 config = binVarConfig.get(binMethodKey,pconf.nullBinMethod).copy()
99 if (len(config[
'values']) < 1
or
100 len(config[
'filters']) < 1):
continue
102 config[
'osName'] = ObsSpaceName
103 config[
'fileFormat'] = jdbs[vu.mean].fileFormat(osKey, obsFKey)
105 binMethods[(binVarKey,binMethodKey)] = bu.BinMethod(config)
112 diagnosticConfigs = du.diagnosticConfigs(
113 selectDiagNames, ObsSpaceName,
114 includeEnsembleDiagnostics = (nMembers > 1),
115 fileFormat = jdbs[vu.mean].fileFormat(osKey, obsFKey))
124 dbVars = {vu.mean: [], vu.ensemble: []}
125 for varName
in obsVars:
126 for diagName, diagnosticConfig
in diagnosticConfigs.items():
127 if 'ObsFunction' not in diagnosticConfig:
continue
130 for grpVar
in diagnosticConfig[
'ObsFunction'].dbVars(
131 varName, diagnosticConfig[
'outerIter']):
132 for memberType
in dbVars.keys():
133 if diagnosticConfig[memberType]:
134 dbVars[memberType].append(grpVar)
140 for (binVarKey,binMethodKey), binMethod
in binMethods.items():
141 for grpVar
in binMethod.dbVars(
142 varName, diagnosticConfig[
'outerIter']):
143 dbVars[vu.mean].append(grpVar)
151 dbVals = jdbs[vu.mean].readVars(osKey, dbVars[vu.mean])
154 jdbs[vu.mean].destroyHandles(osKey)
157 for memStr, jdb
in jdbs.items():
158 if memStr == vu.mean:
continue
161 jdb.initHandles(osKey)
164 memberDBVals = jdb.readVars(osKey, dbVars[vu.ensemble])
165 for dbVar, vals
in memberDBVals.items():
166 dbVals[dbVar+memStr] = vals.copy()
169 jdb.destroyHandles(osKey)
178 for attribName
in su.fileStatAttributes:
179 statsDict[attribName] = []
180 for statName
in su.allFileStats:
181 statsDict[statName] = []
184 for diagName, diagnosticConfig
in sorted(diagnosticConfigs.items()):
185 if 'ObsFunction' not in diagnosticConfig:
continue
187 logger.info(
'Calculating/writing diagnostic stats for:')
188 logger.info(
'DIAG = '+diagName)
189 Diagnostic = diagnosticConfig[
'ObsFunction']
190 outerIter = diagnosticConfig[
'outerIter']
192 for varName
in obsVars:
193 logger.info(
'VARIABLE = '+varName)
195 varShort, varUnits = vu.varAttributes(varName)
197 Diagnostic.evaluate(dbVals, varName, outerIter)
198 diagValues = Diagnostic.result
200 if len(diagValues)-np.isnan(diagValues).sum() == 0:
201 logger.warning(
'All missing values for diagnostic: '+diagName)
203 for (binVarKey,binMethodKey), binMethod
in binMethods.items():
204 if diagName
in binMethod.excludeDiags:
continue
206 binVarName, binGrpName = vu.splitObsVarGrp(binVarKey)
207 binVarShort, binVarUnits = vu.varAttributes(binVarName)
212 binMethod.evaluate(dbVals, varName, outerIter)
214 for binVal
in binMethod.values:
216 binnedDiagnostic = binMethod.apply(diagValues,diagName,binVal)
219 statsDict[
'binVal'].append(binVal)
220 statsVal = su.calcStats(binnedDiagnostic)
221 for statName
in su.allFileStats:
222 statsDict[statName].append(statsVal[statName])
225 statsDict[
'DiagSpaceGrp'].append(ObsSpaceGrp)
226 statsDict[
'varName'].append(varShort)
227 statsDict[
'varUnits'].append(varUnits)
228 statsDict[
'diagName'].append(diagName)
229 statsDict[
'binMethod'].append(binMethodKey)
230 statsDict[
'binVar'].append(binVarShort)
231 statsDict[
'binUnits'].append(binVarUnits)
239 logger.info(
'Writing statistics file')
240 su.write_stats_nc(osKey,statsDict)
242 logger.info(
'Finished')
247 _logger.info(
'Starting '+__name__)
251 if statistics.args.nprocs == 1:
252 statistics.diagnose()
255 workers = mp.Pool(processes = statistics.args.nprocs)
258 statistics.diagnose(workers)
264 _logger.info(
'Finished '+__name__+
' successfully')
266 if __name__ ==
'__main__':
main()
def diagnose(self, workers=None)
def diagnoseObsSpace(self, jdbs, osKey)