MPAS-JEDI
StatisticsDatabase.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 import binning_utils as bu
3 from collections.abc import Iterable
4 from collections import defaultdict
5 from copy import deepcopy
6 import datetime as dt
7 import glob
8 import logging
9 import multiprocessing as mp
10 import numpy as np
11 import pandas as pd
12 import plot_utils as pu
13 import re
14 import os
15 import stat_utils as su
16 from typing import List
17 import var_utils as vu
18 
19 class DiagSpaceDict():
20  def __init__(self, nrows):
21  self.nrowsnrows = nrows
22  self.valuesvalues = {}
23  self.valuesvalues['expName'] = np.empty(nrows, np.chararray)
24  self.valuesvalues['fcTDelta'] = np.empty(nrows, dt.timedelta)
25  self.valuesvalues['cyDTime'] = np.empty(nrows, dt.datetime)
26  for attribName in su.fileStatAttributes:
27  self.valuesvalues[attribName] = np.empty(nrows, np.chararray)
28  for statName in su.allFileStats:
29  self.valuesvalues[statName] = np.empty(nrows, np.float)
30 
31  @classmethod
32  def read(cls, cyStatsFile, expName, fcTDelta, cyDTime):
33  statsDict = su.read_stats_nc(cyStatsFile)
34  nrows = len(statsDict[su.fileStatAttributes[0]])
35  statsDict['expName'] = np.full(nrows, expName)
36  statsDict['fcTDelta'] = np.full(nrows, fcTDelta)
37  statsDict['cyDTime'] = np.full(nrows, cyDTime)
38 
39  new = cls(nrows)
40  for key, val in statsDict.items():
41  assert key in new.values, "ERROR: DiagSpaceDict.read() "+key+" not in values"
42  new.values[key][:] = val[:]
43  return new
44 
45  @classmethod
46  def concatasync(cls, asyncresults):
47  nrows = 0
48  for asyncresult in asyncresults:
49  nrows += asyncresult.get().nrows
50 
51  new = cls(nrows)
52  srow = 0
53  for asyncresult in asyncresults:
54  new.insert(asyncresult.get(), srow)
55  srow += asyncresult.get().nrows
56  return new
57 
58  def insert(self, other, srow):
59  assert srow >= 0, ("Error: can only insert DiagSpaceDict rows >= 0, not ", srow)
60  erow = srow + other.nrows - 1
61  assert erow < self.nrowsnrows, ("Error: can only insert DiagSpaceDict rows < ", self.nrowsnrows, ", not ", erow)
62  for key, val in other.values.items():
63  if isinstance(val, Iterable):
64  assert key in self.valuesvalues, key+" not in DiagSpaceDict"
65  self.valuesvalues[key][srow:erow+1] = val[:]
66 
67  def destroy(self):
68  del self.valuesvalues
69 
70 
71 def dfIndexLevels(df, index):
72  mi = df.index.names
73  return pu.uniqueMembers(
74  df.index.get_level_values(
75  mi.index(index) ).tolist() )
76 
77 
78 def dfVarVals(df, loc, var):
79  return pu.uniqueMembers(df.loc[loc, var].tolist())
80 
81 
82 class StatsDB:
83  '''A container class for a pandas DataFrame of
84  statistics from multiple cycle and/or forecast times.
85  '''
86  def __init__(self, conf):
87 # self.conf = conf
88  ## Examples of directory structures from which this container can extract statistics.
89  ## The stats*.nc files are produced by writediagstats_obsspace.py during cycling experiments.
90  # ASCII statistics file examples for cycling runs (on cheyenne):
91  # (hasFCLenDir == True or self.fcTDeltas[-1] > self.fcTDeltas[0]):
92  #statFile = '/glade/scratch/user/pandac/FC/3dvar/2018041500/{fcDirFormats}/diagnostic_stats/stats_omb_amsua_n19.nc'
93  # | | | | | | | | |
94  # ^ ^ ^ ^ ^ ^ ^ ^
95  # expDirectory expLongName cyDTime fcTDelta statsFileSubDir statsFilePrefix DAMethod DiagSpaceName
96 
97  # (hasFCLenDir == False and self.fcTDeltas[-1] == self.fcTDeltas[0]):
98  #statFile = '/glade/scratch/user/pandac/DA/3dvar/2018041500/diagnostic_stats/stats_3dvar_bumpcov_amsua_n19.nc'
99  # | | | | | | | |
100  # ^ ^ ^ ^ ^ ^ ^
101  # expDirectory expLongName cyDTime statsFileSubDir statsFilePrefix DAMethod DiagSpaceName
102  self.availableavailable = False
103 
104  # selected DiagSpace (ObsSpace name or ModelSpace name)
105  self.DiagSpaceNameDiagSpaceName = conf['DiagSpaceName']
106  self.loggerlogger = logging.getLogger(__name__+'.'+self.DiagSpaceNameDiagSpaceName)
107 
108  # cycle DateTimes
109  firstCycleDTime = conf['firstCycleDTime']
110  lastCycleDTime = conf['lastCycleDTime']
111  cyTimeInc = conf['cyTimeInc']
112  assert cyTimeInc > dt.timedelta(0), "cyTimeInc must be > 0"
113 
114  # forecast TimeDeltas
115  fcTDeltaFirst = conf['fcTDeltaFirst']
116  fcTDeltaLast = conf['fcTDeltaLast']
117  fcTimeInc = conf['fcTimeInc']
118  assert fcTimeInc > dt.timedelta(0), "fcTimeInc must be > 0"
119 
120  # experiment info
121  self.expDirectoryexpDirectory = conf['expDirectory']
122  self.expLongNamesexpLongNames = conf['expLongNames']
123  self.expNamesexpNames = conf['expNames']
124  self.cntrlExpIndexcntrlExpIndex = conf['cntrlExpIndex']
125  self.cntrlExpNamecntrlExpName = self.expNamesexpNames[min([self.cntrlExpIndexcntrlExpIndex, len(self.expNamesexpNames)-1])]
126  self.noncntrlExpNamesnoncntrlExpNames = [x for x in self.expNamesexpNames if x != self.cntrlExpNamecntrlExpName]
127  self.loggerlogger.info('Control Experiment: '+self.cntrlExpNamecntrlExpName)
128  self.loggerlogger.info(('Non-control Experiment(s): ', self.noncntrlExpNamesnoncntrlExpNames))
129 
130  self.DAMethodsDAMethods = conf['DAMethods']
131 
132  self.diagnosticConfigsdiagnosticConfigs = conf['diagnosticConfigs']
133 
134  self.statsFileSubDirsstatsFileSubDirs = conf['statsFileSubDirs']
135 
136  fcDirFormats = conf['fcDirFormats']
137 
138  self.fcTDeltasfcTDeltas = []
139  self.fcTDeltas_dirfcTDeltas_dir = defaultdict(list)
140  self.fcTDeltas_totminfcTDeltas_totmin = []
141  dumTimeDelta = fcTDeltaFirst
142  while dumTimeDelta <= fcTDeltaLast:
143  for expName, fcDirFormat in list(zip(self.expNamesexpNames, fcDirFormats)):
144  self.fcTDeltas_dirfcTDeltas_dir[expName] += [TDelta_dir(dumTimeDelta, fcDirFormat)]
145  self.fcTDeltas_totminfcTDeltas_totmin.append(TDelta_dir(dumTimeDelta, "%m"))
146  # self.fcTDeltas_totsec.append(TDelta_dir(dumTimeDelta, "%s"))
147 
148  self.fcTDeltasfcTDeltas.append(dumTimeDelta)
149  dumTimeDelta = dumTimeDelta + fcTimeInc
150 
151  # whether directory structure includes forecast length
152  self.hasFCLenDirhasFCLenDir = conf['hasFCLenDir']
153  if self.fcTDeltasfcTDeltas[-1] > self.fcTDeltasfcTDeltas[0]: self.hasFCLenDirhasFCLenDir = True
154 
155  self.cyDTimes_dircyDTimes_dir = []
156  self.cyDTimescyDTimes = []
157  dumDateTime = firstCycleDTime
158  while dumDateTime <= lastCycleDTime:
159  cy_date_str = "{:04d}".format(dumDateTime.year) \
160  + "{:02d}".format(dumDateTime.month) \
161  + "{:02d}".format(dumDateTime.day) \
162  + "{:02d}".format(dumDateTime.hour)
163  self.cyDTimes_dircyDTimes_dir.append(cy_date_str)
164  self.cyDTimescyDTimes.append(dumDateTime)
165  dumDateTime = dumDateTime + cyTimeInc
166 
167  # Retrieve list of DiagSpaceNames from files available for all experiments
168  # TODO: Only populate DiagSpaceNames if all cyDTimes and fcTDeltas meet
169  # these conditions:
170  # (1) all stats files are present
171  # (2) all stats files contain the same number of rows (easier)
172  # or equivalent rows (harder)
173  # TODO: add the capability to calculate the stats when files are missing/incomplete
174  # and then output to correct file name
175  expsDiagSpaceNames = []
176  for expName, expLongName, statsFileSubDir, DAMethod in list(zip(
177  self.expNamesexpNames, self.expLongNamesexpLongNames, self.statsFileSubDirsstatsFileSubDirs, self.DAMethodsDAMethods)):
178  dateDir = self.cyDTimes_dircyDTimes_dir[0]
179  if self.hasFCLenDirhasFCLenDir:
180  dateDir = dateDir+'/'+self.fcTDeltas_dirfcTDeltas_dir[expName][0]
181 
182  FILEPREFIX0 = self.expDirectoryexpDirectory+'/'+expLongName +'/'+dateDir+'/' \
183  +statsFileSubDir+'/'+su.statsFilePrefix
184  if DAMethod != '': FILEPREFIX0 += DAMethod+"_"
185 
186  DiagSpaceNames = []
187  for File in glob.glob(FILEPREFIX0+'*.nc'):
188  DiagSpaceName = File[len(FILEPREFIX0):-len('.nc')]
189  if DiagSpaceName == self.DiagSpaceNameDiagSpaceName:
190  DiagSpaceNames.append(DiagSpaceName)
191  expsDiagSpaceNames.append(DiagSpaceNames)
192 
193  # Remove DiagSpaceNames that are not common to all experiments
194  self.availDiagSpaceNamesavailDiagSpaceNames = deepcopy(expsDiagSpaceNames[0])
195  if len(expsDiagSpaceNames) > 1:
196  for expDiagSpaceNames in expsDiagSpaceNames[1:]:
197  for DiagSpaceName in expDiagSpaceNames:
198  if DiagSpaceName not in expDiagSpaceNames:
199  self.availDiagSpaceNamesavailDiagSpaceNames.remove(DiagSpaceName)
200 
201  if (len(self.availDiagSpaceNamesavailDiagSpaceNames) < 1):
202  self.loggerlogger.warning("stats files not available for creating a StatsDB"+
203  "object for the selected DiagSpace => "+self.DiagSpaceNameDiagSpaceName)
204  return
205 
206  assert len(self.availDiagSpaceNamesavailDiagSpaceNames) == 1, (
207  "\n\nERROR: only one DiagSpaceName per object is allowed.")
208 
209  self.availableavailable = True
210 
211  def read(self, np=1):
212  if not self.availableavailable: return
213 
214  self.loggerlogger.info("=====================================================")
215  self.loggerlogger.info("Construct pandas dataframe from static database files")
216  self.loggerlogger.info("=====================================================")
217 
218  nprocs = min(mp.cpu_count(), np)
219 
220  # Read stats for this DiagSpaceName
221  self.loggerlogger.info("Reading intermediate statistics files")
222  self.loggerlogger.info("with "+str(nprocs)+" out of "+str(mp.cpu_count())+" processors")
223  workers = mp.Pool(nprocs)
224  dsDictParts = []
225  for cyDTime, cyDTime_dir in list(zip(self.cyDTimescyDTimes, self.cyDTimes_dircyDTimes_dir)):
226  self.loggerlogger.info(" Working on cycle time "+str(cyDTime))
227  missingFiles = []
228 
229  for expName, expLongName, statsFileSubDir, DAMethod in list(zip(
230  self.expNamesexpNames, self.expLongNamesexpLongNames, self.statsFileSubDirsstatsFileSubDirs, self.DAMethodsDAMethods)):
231  expPrefix = self.expDirectoryexpDirectory+'/'+expLongName
232  ncStatsFile = statsFileSubDir+'/'+su.statsFilePrefix
233  if DAMethod != '': ncStatsFile += DAMethod+"_"
234  ncStatsFile += self.DiagSpaceNameDiagSpaceName+'.nc'
235  for fcTDelta, fcTDelta_dir in list(zip(
236  self.fcTDeltasfcTDeltas, self.fcTDeltas_dirfcTDeltas_dir[expName])):
237 
238  #Read all stats/attributes from NC file for ExpName, fcTDelta, cyDTime
239  dateDir = cyDTime_dir
240  if self.hasFCLenDirhasFCLenDir:
241  dateDir = dateDir+'/'+fcTDelta_dir
242  cyStatsFile = expPrefix+'/'+dateDir+'/'+ncStatsFile
243 
244  if os.path.exists(cyStatsFile):
245  dsDictParts.append(workers.apply_async(DiagSpaceDict.read,
246  args = (cyStatsFile, expName, fcTDelta, cyDTime)))
247  else:
248  missingFiles.append(cyStatsFile)
249 
250  if len(missingFiles) > 0:
251  self.loggerlogger.warning("The following files do not exist. Matching times are excluded from the statistsics.")
252  for File in missingFiles:
253  self.loggerlogger.warning(File)
254  workers.close()
255  workers.join()
256 
257  self.loggerlogger.info("Concatenating statistics sub-dictionaries from multiple processors")
258  dsDict = DiagSpaceDict.concatasync(dsDictParts)
259 
260  ## Convert dsDict to DataFrame
261  self.loggerlogger.info("Constructing a dataframe from statistics dictionary")
262  dsDF = pd.DataFrame.from_dict(dsDict.values)
263  dsDict.destroy()
264  del dsDictParts
265 
266  self.loggerlogger.info("Sorting the dataframe index")
267 
268  indexNames = ['expName', 'fcTDelta', 'cyDTime', 'DiagSpaceGrp',
269  'varName', 'diagName', 'binVar', 'binVal', 'binMethod']
270 
271  dsDF.set_index(indexNames, inplace=True)
272  dsDF.sort_index(inplace=True)
273 
274  self.loggerlogger.info("Extracting index values")
275  ## diagspace group
276  self.DiagSpaceGrpDiagSpaceGrp = dsDF.index.levels[indexNames.index('DiagSpaceGrp')]
277 
278  # remove the DiagSpaceGrp dimension, because it's common across all rows
279  # expName fcTDelta cyDTime varName diagName binVar binVal binMethod
280  dsLoc = (slice(None), slice(None), slice(None), self.DiagSpaceGrpDiagSpaceGrp[0], slice(None), slice(None), slice(None), slice(None), slice(None))
281  self.dfwdfw = DFWrapper(dsDF.xs(dsLoc))
282 
283  self.initAttributesinitAttributesinitAttributes()
284 
285  # add non-aggregated derived diagnostics as needed
286  createORreplaceDerivedDiagnostics(self.dfwdfw, self.diagnosticConfigsdiagnosticConfigs)
287 
288  def initAttributes(self):
289  ## diagnostics (currently unused)
290  #self.containedDiagNames = self.dfw.levels('diagName')
291 
292  ## variables
293  # get varNames and sort alphabetically
294  varNames = self.dfwdfw.levels('varName')
295  nVars = len(varNames)
296  indices = list(range(nVars))
297 
298  # sort by channel number (int) for radiances
299  chlist = ['']*nVars
300  for ivar, varName in enumerate(varNames):
301  for c in list(range(len(varName))):
302  sub = varName[c:]
303  if pu.isint(sub):
304  chlist[ivar] = int(sub)
305  break
306  if '' in chlist:
307  indices.sort(key=varNames.__getitem__)
308  else:
309  indices.sort(key=chlist.__getitem__)
310  self.varNamesvarNames = list(map(varNames.__getitem__, indices))
311  self.chlistchlist = list(map(chlist.__getitem__, indices))
312 
313  ## extract units for each varName from varUnits DF column
314  self.varUnitssvarUnitss = []
315  varLoc = {}
316  varLoc['fcTDelta'] = self.fcTDeltasfcTDeltas[0]
317  varLoc['cyDTime'] = self.cyDTimescyDTimes[0]
318  allDiags = self.dfwdfw.levels('diagName', varLoc)
319  varLoc['diagName'] = allDiags[0]
320 
321  for varName in self.varNamesvarNames:
322  varLoc['varName'] = varName
323  units = self.dfwdfw.uniquevals('varUnits', varLoc)
324  assert len(units) == 1, ("\n\nERROR: too many units values for varName = "+varName,
325  units, varLoc)
326  self.varUnitssvarUnitss.append(units[0])
327 
328  ## bin values --> combination of numerical and string, all stored as strings
329  self.allBinValsallBinVals = self.dfwdfw.levels('binVal')
330 
331  # convert allBinVals to numeric type that can be used as axes values
332  self.binNumValsbinNumVals = []
333  self.binNumVals2DasStrbinNumVals2DasStr = []
334  for binVal in self.allBinValsallBinVals:
335  if pu.isint(binVal):
336  self.binNumValsbinNumVals.append(int(binVal))
337  self.binNumVals2DasStrbinNumVals2DasStr.append(binVal)
338  elif pu.isfloat(binVal):
339  self.binNumValsbinNumVals.append(float(binVal))
340  self.binNumVals2DasStrbinNumVals2DasStr.append(binVal)
341  else:
342  self.binNumValsbinNumVals.append(vu.miss_i)
343 
344  def appendDF(self, newDiagDF):
345  self.dfwdfw.append(newDiagDF)
346  self.initAttributesinitAttributesinitAttributes()
347 
348  def loc(self, locDict, var=None):
349  return DFWrapper(self.dfwdfw.loc(locDict, var))
350 
351 
352  ## not used yet, but should work
353  # def agg(self, aggovers=['cyDTime']):
354  # return DFWrapper(self.dfw.aggStats(groupby))
355 
356 
357 def createORreplaceDerivedDiagnostics(dfw, diagnosticConfigs):
358  for diagName, diagnosticConfig in diagnosticConfigs.items():
359  if diagnosticConfig['derived']:
360  diagNames = dfw.levels('diagName')
361  if diagName in diagNames:
362  # drop derived diagName from dfw
363  dfw.df.drop(diagName, level='diagName', inplace=True)
364 
365  # create then append DataFrame with derived diagName
366  derivedDiagDF = diagnosticConfig['DFWFunction'](
367  dfw, diagnosticConfig['staticArg'])
368  dfw.append(derivedDiagDF)
369 
370 
371 class DFWrapper:
372  def __init__(self, df):
373  self.dfdf = df
374  self.indexNamesindexNames = list(self.dfdf.index.names)
375 
376  @classmethod
377  def fromLoc(cls, other, locDict, var=None):
378  return cls(other.locdf(other.locTuple(locDict), var))
379 
380  @classmethod
381  def fromAggStats(cls, other, aggovers):
382  return cls(other.aggStats(aggovers))
383 
384  def append(self, otherDF = None):
385  if otherDF is None: return
386 
387  #Add otherDF (DataFrame object) to self.df
388  # adds new column names as needed
389  # adds meaningless NaN entries in columns that do not overlap between two DF's
390  # TODO: reduce memory footprint of NaN's via modifications to external data flows
391  appendDF = otherDF.copy(True)
392 
393  selfColumns = list(self.dfdf.columns)
394  appendColumns = list(appendDF.columns)
395 
396  selfNRows = len(self.dfdf.index)
397  for column in appendColumns:
398  if column not in selfColumns:
399  self.dfdf.insert(len(list(self.dfdf.columns)), column, [np.NaN]*selfNRows)
400 
401  appendNRows = len(appendDF.index)
402  for column in selfColumns:
403  if column not in appendColumns:
404  appendDF.insert(len(list(appendDF.columns)), column, [np.NaN]*appendNRows)
405 
406  self.dfdf = self.dfdf.append(appendDF, sort=True)
407 
408  def locTuple(self, locDict={}):
409  Loc = ()
410  for index in list(locDict.keys()):
411  assert index in self.indexNamesindexNames,(
412  "\n\nERROR: index name not in the multiindex, index = "+index
413  +", indexNames = ", self.indexNamesindexNames)
414 
415  for index in self.indexNamesindexNames:
416  indL = list(Loc)
417  if index not in locDict:
418  indL.append(slice(None))
419  elif locDict[index] is None:
420  indL.append(slice(None))
421  elif (isinstance(locDict[index], Iterable) and
422  not isinstance(locDict[index], str)):
423  indL.append(locDict[index])
424  else:
425  indL.append([locDict[index]])
426  Loc = tuple(indL)
427  return Loc
428 
429  def locdf(self, Loc, var=None):
430  if var is None:
431  return self.dfdf.loc[Loc,:]
432  else:
433  return self.dfdf.loc[Loc, var]
434 
435  def levels(self, index, locDict={}):
436  newDF = self.locdflocdflocdf(self.locTuplelocTuplelocTuple(locDict))
437  return dfIndexLevels(newDF, index)
438 
439  def loc(self, locDict, var=None):
440  return self.locdflocdflocdf(self.locTuplelocTuplelocTuple(locDict), var)
441 
442  def var(self, var):
443  return self.loclocloc({}, var=var)
444 
445  def uniquevals(self, var, locDict={}):
446  return pu.uniqueMembers(self.loclocloc(locDict, var).tolist())
447 
448  def min(self, locDict, var=None):
449  return self.locdflocdflocdf(self.locTuplelocTuplelocTuple(locDict), var).dropna().min()
450 
451  def max(self, locDict, var):
452  return self.locdflocdflocdf(self.locTuplelocTuplelocTuple(locDict), var).dropna().max()
453 
454  def aggStats(self, aggovers):
455  groupby = deepcopy(self.indexNamesindexNames)
456  for aggover in aggovers:
457  assert aggover in self.indexNamesindexNames, (
458  "\n\nERROR: aggover argument not in the multiindex, aggover = "+aggover
459  +", indexNames = ", self.indexNamesindexNames)
460  if aggover in groupby: groupby.remove(aggover)
461  return self.dfdf.groupby(groupby).apply(su.aggStatsDF)
462 
463 
464 def TDelta_dir(tdelta, fmt):
465  subs = {}
466  fmts = {}
467  i = '{:d}'
468  i02 = '{:02d}'
469 
470  # "%D %HH:%MM:%SS"
471  subs["D"] = tdelta.days
472  fmts["D"] = i
473 
474  subs["HH"], hrem = divmod(tdelta.seconds, 3600)
475  fmts["HH"] = i02
476 
477  subs["MM"], subs["SS"] = divmod(hrem, 60)
478  fmts["MM"] = i02
479  fmts["SS"] = i02
480 
481  ts = int(tdelta.total_seconds())
482 
483  # "%h"
484  subs["h"], hrem = divmod(ts, 3600)
485  fmts["h"] = i
486 
487  # "%MIN:%SEC"
488  subs["MIN"], subs["SEC"] = divmod(ts, 60)
489  fmts["MIN"] = i
490  fmts["SEC"] = i02
491 
492  subs["m"] = subs["MIN"]
493  fmts["m"] = fmts["MIN"]
494 
495  # "%s"
496  subs["s"] = ts
497  fmts["s"] = i
498 
499  out = fmt
500  for key in subs.keys():
501  out = out.replace("%"+key, fmts[key].format(subs[key]))
502 
503  return out
def fromAggStats(cls, other, aggovers)
def levels(self, index, locDict={})
def append(self, otherDF=None)
def loc(self, locDict, var=None)
def uniquevals(self, var, locDict={})
def min(self, locDict, var=None)
def locdf(self, Loc, var=None)
def max(self, locDict, var)
def fromLoc(cls, other, locDict, var=None)
def locTuple(self, locDict={})
def concatasync(cls, asyncresults)
def read(cls, cyStatsFile, expName, fcTDelta, cyDTime)
available
Examples of directory structures from which this container can extract statistics.
def appendDF(self, newDiagDF)
varUnitss
extract units for each varName from varUnits DF column
def loc(self, locDict, var=None)
DiagSpaceGrp
Convert dsDict to DataFrame.
allBinVals
bin values --> combination of numerical and string, all stored as strings
varNames
diagnostics (currently unused) self.containedDiagNames = self.dfw.levels('diagName')
def dfVarVals(df, loc, var)
def dfIndexLevels(df, index)
def TDelta_dir(tdelta, fmt)
def createORreplaceDerivedDiagnostics(dfw, diagnosticConfigs)