2 import binning_utils
as bu
3 from collections.abc
import Iterable
4 from collections
import defaultdict
5 from copy
import deepcopy
9 import multiprocessing
as mp
12 import plot_utils
as pu
15 import stat_utils
as su
16 from typing
import List
17 import var_utils
as vu
23 self.
valuesvalues[
'expName'] = np.empty(nrows, np.chararray)
24 self.
valuesvalues[
'fcTDelta'] = np.empty(nrows, dt.timedelta)
25 self.
valuesvalues[
'cyDTime'] = np.empty(nrows, dt.datetime)
26 for attribName
in su.fileStatAttributes:
27 self.
valuesvalues[attribName] = np.empty(nrows, np.chararray)
28 for statName
in su.allFileStats:
29 self.
valuesvalues[statName] = np.empty(nrows, np.float)
32 def read(cls, cyStatsFile, expName, fcTDelta, cyDTime):
33 statsDict = su.read_stats_nc(cyStatsFile)
34 nrows = len(statsDict[su.fileStatAttributes[0]])
35 statsDict[
'expName'] = np.full(nrows, expName)
36 statsDict[
'fcTDelta'] = np.full(nrows, fcTDelta)
37 statsDict[
'cyDTime'] = np.full(nrows, cyDTime)
40 for key, val
in statsDict.items():
41 assert key
in new.values,
"ERROR: DiagSpaceDict.read() "+key+
" not in values"
42 new.values[key][:] = val[:]
48 for asyncresult
in asyncresults:
49 nrows += asyncresult.get().nrows
53 for asyncresult
in asyncresults:
54 new.insert(asyncresult.get(), srow)
55 srow += asyncresult.get().nrows
59 assert srow >= 0, (
"Error: can only insert DiagSpaceDict rows >= 0, not ", srow)
60 erow = srow + other.nrows - 1
61 assert erow < self.
nrowsnrows, (
"Error: can only insert DiagSpaceDict rows < ", self.
nrowsnrows,
", not ", erow)
62 for key, val
in other.values.items():
63 if isinstance(val, Iterable):
64 assert key
in self.
valuesvalues, key+
" not in DiagSpaceDict"
65 self.
valuesvalues[key][srow:erow+1] = val[:]
73 return pu.uniqueMembers(
74 df.index.get_level_values(
75 mi.index(index) ).tolist() )
79 return pu.uniqueMembers(df.loc[loc, var].tolist())
83 '''A container class for a pandas DataFrame of
84 statistics from multiple cycle and/or forecast times.
109 firstCycleDTime = conf[
'firstCycleDTime']
110 lastCycleDTime = conf[
'lastCycleDTime']
111 cyTimeInc = conf[
'cyTimeInc']
112 assert cyTimeInc > dt.timedelta(0),
"cyTimeInc must be > 0"
115 fcTDeltaFirst = conf[
'fcTDeltaFirst']
116 fcTDeltaLast = conf[
'fcTDeltaLast']
117 fcTimeInc = conf[
'fcTimeInc']
118 assert fcTimeInc > dt.timedelta(0),
"fcTimeInc must be > 0"
136 fcDirFormats = conf[
'fcDirFormats']
141 dumTimeDelta = fcTDeltaFirst
142 while dumTimeDelta <= fcTDeltaLast:
143 for expName, fcDirFormat
in list(zip(self.
expNamesexpNames, fcDirFormats)):
148 self.
fcTDeltasfcTDeltas.append(dumTimeDelta)
149 dumTimeDelta = dumTimeDelta + fcTimeInc
157 dumDateTime = firstCycleDTime
158 while dumDateTime <= lastCycleDTime:
159 cy_date_str =
"{:04d}".
format(dumDateTime.year) \
160 +
"{:02d}".
format(dumDateTime.month) \
161 +
"{:02d}".
format(dumDateTime.day) \
162 +
"{:02d}".
format(dumDateTime.hour)
164 self.
cyDTimescyDTimes.append(dumDateTime)
165 dumDateTime = dumDateTime + cyTimeInc
175 expsDiagSpaceNames = []
176 for expName, expLongName, statsFileSubDir, DAMethod
in list(zip(
182 FILEPREFIX0 = self.
expDirectoryexpDirectory+
'/'+expLongName +
'/'+dateDir+
'/' \
183 +statsFileSubDir+
'/'+su.statsFilePrefix
184 if DAMethod !=
'': FILEPREFIX0 += DAMethod+
"_"
187 for File
in glob.glob(FILEPREFIX0+
'*.nc'):
188 DiagSpaceName = File[len(FILEPREFIX0):-len(
'.nc')]
190 DiagSpaceNames.append(DiagSpaceName)
191 expsDiagSpaceNames.append(DiagSpaceNames)
195 if len(expsDiagSpaceNames) > 1:
196 for expDiagSpaceNames
in expsDiagSpaceNames[1:]:
197 for DiagSpaceName
in expDiagSpaceNames:
198 if DiagSpaceName
not in expDiagSpaceNames:
202 self.
loggerlogger.warning(
"stats files not available for creating a StatsDB"+
203 "object for the selected DiagSpace => "+self.
DiagSpaceNameDiagSpaceName)
207 "\n\nERROR: only one DiagSpaceName per object is allowed.")
214 self.
loggerlogger.info(
"=====================================================")
215 self.
loggerlogger.info(
"Construct pandas dataframe from static database files")
216 self.
loggerlogger.info(
"=====================================================")
218 nprocs = min(mp.cpu_count(), np)
221 self.
loggerlogger.info(
"Reading intermediate statistics files")
222 self.
loggerlogger.info(
"with "+str(nprocs)+
" out of "+str(mp.cpu_count())+
" processors")
223 workers = mp.Pool(nprocs)
226 self.
loggerlogger.info(
" Working on cycle time "+str(cyDTime))
229 for expName, expLongName, statsFileSubDir, DAMethod
in list(zip(
231 expPrefix = self.
expDirectoryexpDirectory+
'/'+expLongName
232 ncStatsFile = statsFileSubDir+
'/'+su.statsFilePrefix
233 if DAMethod !=
'': ncStatsFile += DAMethod+
"_"
235 for fcTDelta, fcTDelta_dir
in list(zip(
239 dateDir = cyDTime_dir
241 dateDir = dateDir+
'/'+fcTDelta_dir
242 cyStatsFile = expPrefix+
'/'+dateDir+
'/'+ncStatsFile
244 if os.path.exists(cyStatsFile):
245 dsDictParts.append(workers.apply_async(DiagSpaceDict.read,
246 args = (cyStatsFile, expName, fcTDelta, cyDTime)))
248 missingFiles.append(cyStatsFile)
250 if len(missingFiles) > 0:
251 self.
loggerlogger.warning(
"The following files do not exist. Matching times are excluded from the statistsics.")
252 for File
in missingFiles:
253 self.
loggerlogger.warning(File)
257 self.
loggerlogger.info(
"Concatenating statistics sub-dictionaries from multiple processors")
258 dsDict = DiagSpaceDict.concatasync(dsDictParts)
261 self.
loggerlogger.info(
"Constructing a dataframe from statistics dictionary")
262 dsDF = pd.DataFrame.from_dict(dsDict.values)
266 self.
loggerlogger.info(
"Sorting the dataframe index")
268 indexNames = [
'expName',
'fcTDelta',
'cyDTime',
'DiagSpaceGrp',
269 'varName',
'diagName',
'binVar',
'binVal',
'binMethod']
271 dsDF.set_index(indexNames, inplace=
True)
272 dsDF.sort_index(inplace=
True)
274 self.
loggerlogger.info(
"Extracting index values")
276 self.
DiagSpaceGrpDiagSpaceGrp = dsDF.index.levels[indexNames.index(
'DiagSpaceGrp')]
280 dsLoc = (slice(
None), slice(
None), slice(
None), self.
DiagSpaceGrpDiagSpaceGrp[0], slice(
None), slice(
None), slice(
None), slice(
None), slice(
None))
294 varNames = self.
dfwdfw.levels(
'varName')
295 nVars = len(varNames)
296 indices = list(range(nVars))
300 for ivar, varName
in enumerate(varNames):
301 for c
in list(range(len(varName))):
304 chlist[ivar] = int(sub)
307 indices.sort(key=varNames.__getitem__)
309 indices.sort(key=chlist.__getitem__)
310 self.
varNamesvarNames = list(map(varNames.__getitem__, indices))
311 self.
chlistchlist = list(map(chlist.__getitem__, indices))
316 varLoc[
'fcTDelta'] = self.
fcTDeltasfcTDeltas[0]
317 varLoc[
'cyDTime'] = self.
cyDTimescyDTimes[0]
318 allDiags = self.
dfwdfw.levels(
'diagName', varLoc)
319 varLoc[
'diagName'] = allDiags[0]
321 for varName
in self.
varNamesvarNames:
322 varLoc[
'varName'] = varName
323 units = self.
dfwdfw.uniquevals(
'varUnits', varLoc)
324 assert len(units) == 1, (
"\n\nERROR: too many units values for varName = "+varName,
338 elif pu.isfloat(binVal):
339 self.
binNumValsbinNumVals.append(float(binVal))
345 self.
dfwdfw.append(newDiagDF)
348 def loc(self, locDict, var=None):
358 for diagName, diagnosticConfig
in diagnosticConfigs.items():
359 if diagnosticConfig[
'derived']:
360 diagNames = dfw.levels(
'diagName')
361 if diagName
in diagNames:
363 dfw.df.drop(diagName, level=
'diagName', inplace=
True)
366 derivedDiagDF = diagnosticConfig[
'DFWFunction'](
367 dfw, diagnosticConfig[
'staticArg'])
368 dfw.append(derivedDiagDF)
378 return cls(other.locdf(other.locTuple(locDict), var))
382 return cls(other.aggStats(aggovers))
385 if otherDF
is None:
return
391 appendDF = otherDF.copy(
True)
393 selfColumns = list(self.
dfdf.columns)
394 appendColumns = list(appendDF.columns)
396 selfNRows = len(self.
dfdf.index)
397 for column
in appendColumns:
398 if column
not in selfColumns:
399 self.
dfdf.insert(len(list(self.
dfdf.columns)), column, [np.NaN]*selfNRows)
401 appendNRows = len(appendDF.index)
402 for column
in selfColumns:
403 if column
not in appendColumns:
404 appendDF.insert(len(list(appendDF.columns)), column, [np.NaN]*appendNRows)
406 self.
dfdf = self.
dfdf.
append(appendDF, sort=
True)
410 for index
in list(locDict.keys()):
412 "\n\nERROR: index name not in the multiindex, index = "+index
413 +
", indexNames = ", self.
indexNamesindexNames)
417 if index
not in locDict:
418 indL.append(slice(
None))
419 elif locDict[index]
is None:
420 indL.append(slice(
None))
421 elif (isinstance(locDict[index], Iterable)
and
422 not isinstance(locDict[index], str)):
423 indL.append(locDict[index])
425 indL.append([locDict[index]])
431 return self.
dfdf.loc[Loc,:]
433 return self.
dfdf.loc[Loc, var]
439 def loc(self, locDict, var=None):
443 return self.
loclocloc({}, var=var)
446 return pu.uniqueMembers(self.
loclocloc(locDict, var).tolist())
448 def min(self, locDict, var=None):
451 def max(self, locDict, var):
456 for aggover
in aggovers:
457 assert aggover
in self.
indexNamesindexNames, (
458 "\n\nERROR: aggover argument not in the multiindex, aggover = "+aggover
459 +
", indexNames = ", self.
indexNamesindexNames)
460 if aggover
in groupby: groupby.remove(aggover)
461 return self.
dfdf.groupby(groupby).apply(su.aggStatsDF)
471 subs[
"D"] = tdelta.days
474 subs[
"HH"], hrem = divmod(tdelta.seconds, 3600)
477 subs[
"MM"], subs[
"SS"] = divmod(hrem, 60)
481 ts = int(tdelta.total_seconds())
484 subs[
"h"], hrem = divmod(ts, 3600)
488 subs[
"MIN"], subs[
"SEC"] = divmod(ts, 60)
492 subs[
"m"] = subs[
"MIN"]
493 fmts[
"m"] = fmts[
"MIN"]
500 for key
in subs.keys():
501 out = out.replace(
"%"+key, fmts[key].
format(subs[key]))
def fromAggStats(cls, other, aggovers)
def levels(self, index, locDict={})
def append(self, otherDF=None)
def aggStats(self, aggovers)
def loc(self, locDict, var=None)
def uniquevals(self, var, locDict={})
def min(self, locDict, var=None)
def locdf(self, Loc, var=None)
def max(self, locDict, var)
def fromLoc(cls, other, locDict, var=None)
def locTuple(self, locDict={})
def __init__(self, nrows)
def concatasync(cls, asyncresults)
def insert(self, other, srow)
def read(cls, cyStatsFile, expName, fcTDelta, cyDTime)
available
Examples of directory structures from which this container can extract statistics.
def appendDF(self, newDiagDF)
varUnitss
extract units for each varName from varUnits DF column
def loc(self, locDict, var=None)
DiagSpaceGrp
Convert dsDict to DataFrame.
allBinVals
bin values --> combination of numerical and string, all stored as strings
varNames
diagnostics (currently unused) self.containedDiagNames = self.dfw.levels('diagName')
def dfVarVals(df, loc, var)
def dfIndexLevels(df, index)
def TDelta_dir(tdelta, fmt)
def createORreplaceDerivedDiagnostics(dfw, diagnosticConfigs)