MPAS-JEDI
JediDB.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 from collections import defaultdict
4 import config as conf
5 from copy import deepcopy
6 import glob
7 from JediDBArgs import \
8  obsFKey, geoFKey, diagFKey, default_path, fPrefixes
9 import h5py as h5
10 import logging
11 from netCDF4 import Dataset
12 import numpy as np
13 import os
14 import plot_utils as pu
15 import var_utils as vu
16 import sys
17 
18 _logger = logging.getLogger(__name__)
19 
20 MAXINT32 = np.int32(1e9)
21 MAXFLOAT = np.float32(1.e12)
22 MAXDOUBLE = np.float64(1.e12)
23 
24 def getIODAFileRank(pathPlusFile):
25  # pathPlusFile = PATH/fileName
26  # fileName = prefix_suffix.fileExt
27  # suffix may or may not be a numerical value indicating processing element rank
28  # return the rank if any
29 
30  # fileName excludes the path
31  fileName = pathPlusFile.split('/')[-1]
32 
33  # fileParts as list, removing extension (after '.') first
34  fileParts = (fileName.split('.')[0]).split('_')
35 
36  rank = fileParts[-1]
37 
38  # check if suffix is numerical, indicating the rank number
39  if not pu.isint(rank):
40  return 'none'
41  else:
42  return rank
43 
44 
45 def IODAFileIsRanked(pathPlusFile):
46  return pu.isint(getIODAFileRank(pathPlusFile))
47 
48 
49 def getIODAFileHandle(File, mode='r'):
50  # returns a FileHandle object for File with permissions declared in mode
51  ext = File.split('.')[-1]
52  isNC = (ext in 'nc4')
53  isHDF = (ext == 'h5')
54  if isNC:
55  return NCFileHandle(File, mode)
56  elif isHDF:
57  return HDFFileHandle(File, mode)
58  else:
59  _logger.error('getIODAFileHandle:unsupported file extension => '+ext)
60 
61 
62 class FileHandles():
63  '''wrapper for list of FileHandle objects'''
64  def __init__(self, files, mode):
65  self.handleshandles = []
66  for File in files:
67  self.handleshandles.append(getIODAFileHandle(File, mode))
68 
69  self.sizesize = len(self.handleshandles)
70 
71  self.nlocsnlocs = 0
72  for h in self.handleshandles:
73  self.nlocsnlocs += h.nlocs
74 
75  self.variablesvariables = self.handleshandles[0].variables
76  self.fileFormatfileFormat = self.handleshandles[0].fileFormat
77 
78  def close(self):
79  for h in self.handleshandles:
80  h.close()
81 
82  def varsINGroup(self, group):
83  # comprehensive list of variables in group
84  filevars = self.handleshandles[0].varsINGroup(group)
85 
86  # only include variables with finite data
87  varlist = []
88  for var in filevars:
89  grpVar = vu.IODAVarCtors[self.fileFormatfileFormat](var, group)
90  var1D = self.var1DatLocsvar1DatLocsvar1DatLocs(grpVar)
91  if np.isfinite(var1D).sum() > 0: varlist.append(var)
92 
93  return varlist
94 
95  def datatype(self, var):
96  return self.handleshandles[0].datatype(var)
97 
98  def var1DatLocs(self, grpVar, level = None):
99  varName, group = vu.splitObsVarGrp(grpVar)
100 
101  # GeoVaLs and ObsDiagnostics do not have ObsGroups in the file variable names
102  if group==vu.geoGroup or group==vu.diagGroup:
103  fileVar = varName
104  else:
105  fileVar = grpVar
106 
107  dtype = self.datatypedatatypedatatype(fileVar)
108 
109  if fileVar not in self.variablesvariables:
110  _logger.error('FileHandles.var1DatLocs: fileVar not present: '+fileVar)
111 
112  if 'byte' in dtype:
113  vals1D = np.empty(self.nlocsnlocs, dtype=np.object_)
114  istart = 0
115  for h in self.handleshandles:
116  iend = istart + h.nlocs
117  tmp = np.empty(h.nlocs, dtype=np.object_)
118  for ii, bytelist in enumerate(h.variableAS1DArray(fileVar)):
119  tmp[ii] = (b''.join(bytelist)).decode('utf-8')
120  vals1D[istart:iend] = tmp
121  istart = iend
122  else:
123  vals1D = np.empty(self.nlocsnlocs, dtype=dtype)
124  istart = 0
125  for h in self.handleshandles:
126  iend = istart + h.nlocs
127  if level is None:
128  vals1D[istart:iend] = h.variableAS1DArray(fileVar)
129  else:
130  # GeoVaLs and ObsDiagnostics are always stored as 2D
131  vals1D[istart:iend] = np.transpose(np.asarray(h.variableAS1DArray(fileVar)))[level][:]
132  istart = iend
133 
134  assert iend == self.nlocsnlocs, (
135  'FileHandles.var1DatLocs: incorrect nlocs (',iend,'!=',nlocs,') for ', grpVar)
136 
137  # convert pressure from Pa to hPa if needed (kludge)
138  if (vu.obsVarPrs in grpVar and np.max(vals1D) > 10000.0):
139  vals1D = np.divide(vals1D, 100.0)
140 
141  # missing data
142  if 'int32' in dtype:
143  missing = np.greater(np.abs(vals1D), MAXINT32)
144  elif 'float32' in dtype:
145  missing = np.greater(np.abs(vals1D), MAXFLOAT)
146  elif 'float64' in dtype:
147  missing = np.greater(np.abs(vals1D), MAXDOUBLE)
148  else:
149  missing = np.full_like(vals1D, False, dtype=bool)
150 
151  if 'float' in dtype:
152  vals1D[missing] = np.NaN
153 
154  #TODO: missing value handling for integers, strings, and others?
155 
156  return vals1D
157 
158 
159 class FileHandle():
160  '''This serves as a base class for netcdf4 and hdf5 wrapper classes for IODA-formatted and UFO GeoVaLs-formatted files'''
161  def __init__(self, File, mode):
162  self.variablesvariables = {}
163  self.FileFile = File
164  self.modemode = mode
165 
166  def datatype(self, var):
167  '''
168  virtual method, returns the basic data type of var
169  '''
170  raise NotImplementedError()
171 
172  def variableAS1DArray(self, var):
173  '''
174  virtual method, returns var as a 1D array
175  '''
176  raise NotImplementedError()
177 
178  def close(self):
179  del self.variablesvariables
180 
181  def varsINGroup(self, group):
182  # returns all variables in an ObsGroup with the ObsGroup label removed
183  assert group!=vu.geoGroup and group!=vu.diagGroup, (
184  'varsINGroup cannot handle GeoVaLs or ObsDiagnostics')
185  varlist = []
186  for vargrp in self.variablesvariables.keys():
187  var, grp = vu.splitObsVarGrp(vargrp)
188  if grp == group: varlist.append(var)
189  return varlist
190 
191 
192 # NETCDF4
193 # ObsSpace IODA-v1, GeoVaLS, ObsDiagnostics
194 class NCFileHandle(FileHandle):
195  def __init__(self, File, mode):
196  super().__init__(File, mode)
197 
198  self.fileFormatfileFormat = vu.ncFileFormat
199  self.hh = Dataset(File, mode)
200  self.hh.set_auto_mask(False)
201  self.nlocsnlocs = self.hh.dimensions['nlocs'].size
202 
203  self.variablesvariablesvariables = self.hh.variables
204 
205  def datatype(self, var):
206  return self.hh.variables[var].datatype.name
207 
208  def variableAS1DArray(self, var):
209  return self.hh.variables[var][:]
210 
211  def close(self):
212  super().close()
213  self.hh.close()
214 
215 
216 # HDF5, including 2D variables
217 # ObsSpace IODA-v2
218 class HDF2DtoArray1D():
219  def __init__(self, h, var2D, index):
220  self.hh = h
221  self.var2Dvar2D = var2D
222  self.indexindex = index
223 
224  def get(self):
225  return self.hh[self.var2Dvar2D][:,self.indexindex]
226 
228  def __init__(self, File, mode):
229  super().__init__(File, mode)
230 
231  self.fileFormatfileFormat = vu.hdfFileFormat
232  self.hh = h5.File(File, mode)
233  self.nlocsnlocs = self.hh['nlocs'].size
234 
235  varlist = []
236  for node in self.hh:
237  if type(self.hh[node]) is h5._hl.group.Group:
238  for var in self.hh[node]:
239  varlist += [node+'/'+var]
240 
241  self.variablesvariablesvariables = {}
242  for var in varlist:
243  shape = self.hh[var].shape
244 
245  # retrieve dimensions
246  dims = np.empty(len(shape), object)
247  for ii, dim in enumerate(self.hh[var].attrs['DIMENSION_LIST']):
248  dims[ii] = self.hh[dim[0]]
249  assert len(dims[ii]) == shape[ii], ('HDFFileHandle.init: len(dims[ii]) and shape[ii] do not match, ii = ', ii)
250  if dims[0] != self.hh['nlocs']: continue
251 
252  if len(shape) == 1:
253  self.variablesvariablesvariables[var] = self.hh[var]
254  elif len(shape) == 2:
255  # assign proxy HDF2DtoArray1D objects along second dimension
256  for ii, d2Value in enumerate(dims[1]):
257  self.variablesvariablesvariables[vu.appendSuffix(var, d2Value)] = HDF2DtoArray1D(self.hh, var, ii)
258  else:
259  _logger.error('HDFFileHandle.init: unable to handle more than 2 dimensions')
260 
261  def datatype(self, var):
262  # first branch of logic accounts for the fact that some variables are stored in hdf5 files with a channel suffix
263  if var in self.hh:
264  fileVarName = var
265  # second branch of logic removes the channel suffix that was artifically added in self.variables
266  else:
267  varName, grp = vu.splitObsVarGrp(var)
268  fileVarName, suf = vu.splitIntSuffix(varName)
269  fileVarName = grp+'/'+fileVarName
270  return self.hh[fileVarName].dtype.name
271 
272  def variableAS1DArray(self, var):
273  if type(self.variablesvariablesvariables[var]) is h5._hl.dataset.Dataset:
274  return self.variablesvariablesvariables[var][:]
275  elif type(self.variablesvariablesvariables[var]) is HDF2DtoArray1D:
276  return self.variablesvariablesvariables[var].get()
277  else:
278  _logger.error('HDFFileHandle.variableAS1DArray: unsupported type')
279 
280  def close(self):
281  super().close()
282  self.hh.close()
283 
284 
285 class JediDB:
286  '''This class provides access to UFO feedback files.'''
287  def __init__(self, data_path=default_path, osKeySelect=[]):
288  # data_path: location of feedback files
289  # fileExt: extention of feedback files
290  # osKeySelect (optional): allows for the selection of particular osKey's
291 
292  supportedFileExts=['nc4','h5']
293 
294  self.filePrefixesfilePrefixes = {}
295  for key, prefix in fPrefixes.items():
296  self.filePrefixesfilePrefixes[key] = prefix
297 
298  # Group all files by experiment-ObsSpace combination
299  # Two possible formats for multi-processor or single-processor/concatenated
300  # multi-processor: self.filePrefixes[fileType]+"_"+osKey+"_"+rank+"."+fileExt
301  # single-processor: self.filePrefixes[fileType]+"_"+osKey+"."+fileExt
302 
303  # osKey includes strings associated with both the experiment and ObsSpaceName
304  # rank, when present, is the four digit processor element rank. If a file matching
305  # the single-processor format is present, it will be selected in place of the other
306  # files.
307  #
308  # examples:
309  # + obsout_hofx_sondes_0000.nc4 OR
310  # + obsout_hofx_sondes.nc4
311  # osKey = "hofx_sondes"
312  #
313  # + obsout_variational_amsua_n19_0000.nc4 OR
314  # + obsout_variational_amsua_n19.nc4
315  # osKey = "variational_amsua_n19"
316  #
317  # + obsout_3dvar_bumpcov_bumploc_amsua_n19_0000.nc4 OR
318  # + obsout_3dvar_bumpcov_bumploc_amsua_n19.nc4
319  # osKey = "3dvar_bumpcov_bumploc_amsua_n19"
320  #
321  # + obsout_abi_g16_0000.nc4 OR
322  # + obsout_abi_g16.nc4
323  # osKey = "abi_g16"
324 
325  # note: when obsFKey (ObsSpace) files are concatenated into a single file, the
326  # optional geoval and ydiag files must also be concatenated in order to
327  # preserve locations ordering
328 
329  allFiles = {}
330  for fileType, prefix in self.filePrefixesfilePrefixes.items():
331  allFiles[fileType] = defaultdict(list)
332  for fileExt in supportedFileExts:
333  for pathPlusFile in glob.glob(data_path+'/'+prefix+'*.'+fileExt):
334  # fileName excludes the path
335  fileName = pathPlusFile.split('/')[-1]
336 
337  # fileParts as list, removing extension after final '.'
338  fileParts = ('.'.join(fileName.split('.')[:-1])).split('_')
339 
340  # remove known prefix
341  fileParts.remove(prefix)
342 
343  if IODAFileIsRanked(pathPlusFile):
344  # osKey excludes PE after the final '_'
345  osKey = '_'.join(fileParts[:-1])
346  else:
347  # osKey includes all remaining parts
348  osKey = '_'.join(fileParts)
349 
350  allFiles[fileType][osKey].append(pathPlusFile)
351 
352  # + transpose files for easy access by osKey
353  # + remove PE-ranked files when concatenated file is present
354  self.FilesFiles = {}
355  for fileType, osKeys in allFiles.items():
356  for osKey, files in osKeys.items():
357  if osKey not in self.FilesFiles:
358  self.FilesFiles[osKey] = defaultdict(list)
359  for pathPlusFile in files:
360  if not IODAFileIsRanked(pathPlusFile):
361  # for non-numerical suffix assume that this one file includes ALL locations
362  self.FilesFiles[osKey][fileType] = [pathPlusFile]
363  break
364  else:
365  # otherwise, append this file to list
366  self.FilesFiles[osKey][fileType].append(pathPlusFile)
367  del allFiles
368 
369  # error checking
370  self.nObsFilesnObsFiles = {}
371  self.loggersloggers = {}
372  for osKey, fileTypes in self.FilesFiles.items():
373  self.loggersloggers[osKey] = logging.getLogger(__name__+'.'+osKey)
374  nObsFiles = len(fileTypes.get(obsFKey,[]))
375  # force crash when there are no obsFKey files available for an osKey
376  if nObsFiles < 1:
377  self.loggersloggers[osKey].error('''
378  There are no '''+obsFKey+'''
379  feedback files with prefix '''+self.filePrefixesfilePrefixes[obsFKey]+'''
380  osKey = '''+osKey)
381  self.nObsFilesnObsFiles[osKey] = nObsFiles
382  # force crash when # of non-obs files is > 0 but != # of obsFKey files
383  for key, prefix in self.filePrefixesfilePrefixes.items():
384  if key == obsFKey: continue
385  nFiles = len(fileTypes.get(key,[]))
386  if nFiles > 0 and nFiles < nObsFiles:
387  self.loggersloggers[osKey].error('''
388  There are not enough '''+key+'''
389  feedback files with prefix '''+prefix+'''
390  #'''+obsFKey+' = '+str(nObsFiles)+'''
391  #'''+key+' = '+str(nFiles)+'''
392  osKey = '''+osKey)
393 
394  # eliminate osKeys that are designated to not be processed
395  self.ObsSpaceNameObsSpaceName = {}
396  self.ObsSpaceGroupObsSpaceGroup = {}
397  for osKey in list(self.FilesFiles.keys()):
398  # determine ObsSpace from osKey string
399  # and only process valid ObsSpaceNames
400  expt_parts = osKey.split("_")
401  nstr = len(expt_parts)
402  ObsSpaceInfo = conf.nullDiagSpaceInfo
403  for i in range(0,nstr):
404  ObsSpaceName_ = '_'.join(expt_parts[i:nstr])
405  ObsSpaceInfo_ = conf.DiagSpaceConfig.get( ObsSpaceName_,conf.nullDiagSpaceInfo)
406  if ObsSpaceInfo_['process']:
407  ObsSpaceName = deepcopy(ObsSpaceName_)
408  ObsSpaceInfo = deepcopy(ObsSpaceInfo_)
409  if ((len(osKeySelect)>0 and osKey not in osKeySelect) or
410  not ObsSpaceInfo['process'] or
411  ObsSpaceInfo.get('DiagSpaceGrp',conf.model_s) == conf.model_s):
412  del self.FilesFiles[osKey]
413  else:
414  self.ObsSpaceNameObsSpaceName[osKey] = deepcopy(ObsSpaceName)
415  self.ObsSpaceGroupObsSpaceGroup[osKey] = deepcopy(ObsSpaceInfo['DiagSpaceGrp'])
416 
417  # sort remaining files by rank
418  for osKey, fileTypes in self.FilesFiles.items():
419  for fileType, files in fileTypes.items():
420  ranks = []
421  for fileName in files:
422  rank = getIODAFileRank(fileName)
423  if pu.isint(rank):
424  ranks.append(int(rank))
425  elif len(files) == 1:
426  ranks = [1]
427  break
428  else:
429  self.loggersloggers[osKey].error('too many files chosen for a concatenated ObsSpace=> '+fileType)
430 
431  indices = list(range(len(files)))
432  indices.sort(key=ranks.__getitem__)
433  self.FilesFiles[osKey][fileType] = \
434  list(map(files.__getitem__, indices))
435 
436  self.HandlesHandles = {}
437 
438  # TODO: add function that concatenates files together, then
439  # TODO: replaces the original Files and/or Handles
440 
441  def initHandles(self, osKey):
442  #initialize Handles for osKey
443  # osKey (string) - experiment-ObsSpace key
444  self.loggersloggers[osKey].info('Initializing UFO file handles...')
445 
446  self.HandlesHandles[osKey] = {}
447 
448  for fileType, files in self.FilesFiles[osKey].items():
449  if len(files) == self.nObsFilesnObsFiles[osKey]:
450  self.loggersloggers[osKey].info(' fileType = '+fileType)
451  self.HandlesHandles[osKey][fileType] = FileHandles(files, 'r')
452 
453 
454  def destroyHandles(self, osKey):
455  #destroys file handles for osKey
456  # osKey (string) - experiment-ObsSpace key
457  if osKey in self.HandlesHandles:
458  for fileType, h in self.HandlesHandles[osKey].items():
459  h.close()
460  del self.HandlesHandles[osKey]
461 
462 
463  def fileFormat(self, osKey, fileType):
464  #return the file format for the first file handle
465  # osKey (string) - experiment-ObsSpace key
466  # fileType (string) - file type key for self.Files[osKey]
467 
468  # extract file handles
469  fHandles = self.HandlesHandles[osKey].get(fileType, None)
470  if fHandles is None:
471  self.loggersloggers[osKey].error('no files exist => '+fileType)
472 
473  return fHandles.fileFormat
474 
475 
476  def varList(self, osKey, fileType, selectGrp):
477  #return a list of variable names that fits the input parameters
478  # osKey (string) - experiment-ObsSpace key
479  # selectGrp (string) - variable group name desired from self.Files
480  # fileType (string) - file type key for self.Files[osKey]
481 
482  # extract file handles
483  fHandles = self.HandlesHandles[osKey].get(fileType, None)
484  if fHandles is None:
485  self.loggersloggers[osKey].error('no files exist => '+fileType)
486 
487  assert fileType == obsFKey, 'varList not implemented for '+fileType
488 
489  varlist = fHandles.varsINGroup(selectGrp)
490 
491  # sort varlist alphabetically or
492  # by integer suffix for uniform dictName (e.g., channel number)
493  indices = list(range(len(varlist)))
494  dictName0, suf = vu.splitIntSuffix(varlist[0])
495  intlist = []
496  sortbyInt = True
497  for var in varlist:
498  dictName, suf = vu.splitIntSuffix(var)
499  # do not sort by integer suffix when
500  # + there is any non-integer suffix
501  # + there is more than one unique dictName
502  if not pu.isint(suf) or dictName != dictName0:
503  sortbyInt = False
504  break
505  intlist.append(suf)
506 
507  if sortbyInt:
508  indices.sort(key=[int(i) for i in intlist].__getitem__)
509  else:
510  indices.sort(key=varlist.__getitem__)
511  varlist = list(map(varlist.__getitem__, indices))
512 
513  return varlist
514 
515 
516  def readVars(self, osKey, dbVars):
517  # osKey (string) - experiment-ObsSpace key
518  # dbVars (string) - list of variables desired from self.Handles[osKey]
519 
520  self.loggersloggers[osKey].info('Reading requested variables from UFO file(s)...')
521 
522  ObsSpace = self.HandlesHandles[osKey][obsFKey]
523  ObsDiagnostics = self.HandlesHandles[osKey].get(diagFKey, None)
524  GeoVaLs = self.HandlesHandles[osKey].get(geoFKey, None)
525 
526  #TODO: enable level selection for GeoVaLs and ObsDiagnostics during binning
527  # might need to store level value in dbVars members using unique suffix
528  diagLev = 0
529  geoLev = 0
530 
531  # Construct output dictionary
532  varsVals = {}
533  for grpVar in pu.uniqueMembers(dbVars):
534 
535  varName, grpName = vu.splitObsVarGrp(grpVar)
536 
537  if grpVar in ObsSpace.variables:
538  varsVals[grpVar] = ObsSpace.var1DatLocs(grpVar)
539 
540  elif vu.geoGroup in grpName and GeoVaLs is not None:
541  varsVals[grpVar] = GeoVaLs.var1DatLocs(grpVar, geoLev)
542 
543  elif vu.diagGroup in grpName and ObsDiagnostics is not None:
544  varsVals[grpVar] = ObsDiagnostics.var1DatLocs(grpVar, diagLev)
545 
546  else:
547  self.loggersloggers[osKey].error('grpVar not found => '+grpVar)
548 
549  return varsVals
def varsINGroup(self, group)
Definition: JediDB.py:181
def close(self)
Definition: JediDB.py:178
def datatype(self, var)
Definition: JediDB.py:166
def variableAS1DArray(self, var)
Definition: JediDB.py:172
def __init__(self, File, mode)
Definition: JediDB.py:161
def datatype(self, var)
Definition: JediDB.py:95
def varsINGroup(self, group)
Definition: JediDB.py:82
def close(self)
Definition: JediDB.py:78
def __init__(self, files, mode)
Definition: JediDB.py:64
def var1DatLocs(self, grpVar, level=None)
Definition: JediDB.py:98
def __init__(self, h, var2D, index)
Definition: JediDB.py:219
def variableAS1DArray(self, var)
Definition: JediDB.py:272
def __init__(self, File, mode)
Definition: JediDB.py:228
def datatype(self, var)
Definition: JediDB.py:261
def close(self)
Definition: JediDB.py:280
def readVars(self, osKey, dbVars)
Definition: JediDB.py:516
def initHandles(self, osKey)
Definition: JediDB.py:441
def __init__(self, data_path=default_path, osKeySelect=[])
Definition: JediDB.py:287
def fileFormat(self, osKey, fileType)
Definition: JediDB.py:463
def destroyHandles(self, osKey)
Definition: JediDB.py:454
def varList(self, osKey, fileType, selectGrp)
Definition: JediDB.py:476
def close(self)
Definition: JediDB.py:211
def datatype(self, var)
Definition: JediDB.py:205
def variableAS1DArray(self, var)
Definition: JediDB.py:208
def __init__(self, File, mode)
Definition: JediDB.py:195
def getIODAFileRank(pathPlusFile)
Definition: JediDB.py:24
def getIODAFileHandle(File, mode='r')
Definition: JediDB.py:49
def IODAFileIsRanked(pathPlusFile)
Definition: JediDB.py:45