3 from multiprocessing
import Pool
12 def subset(infile, nlocsout, suffix, geofile, diagfile):
13 print(
'Processing:', infile)
14 outfile = infile[:-4]+suffix
15 ncin = nc.Dataset(infile)
16 ncout = nc.Dataset(outfile,
'w')
20 recnum = ncin.variables[
'record_number@MetaData'][:]
21 recnum_uniq = np.unique(recnum)
22 nrecsin = len(recnum_uniq)
23 nlocsin = len(ncin.dimensions[
'nlocs'])
24 nvars = len(ncin.dimensions[
'nvars'])
26 nsamples =
int(nlocsout/10.)
27 nsamples = max(1, nsamples)
34 if nlocsout > nlocsin:
36 flag = random.sample(list(np.arange(0, npossible)), nsamples)
42 flag = np.isin(ncin.variables[
'record_number@MetaData'][:], flag)
43 nlocsout = len(ncin.variables[
'record_number@MetaData'][flag, ...])
50 for aname
in ncin.ncattrs():
51 avalue = ncin.getncattr(aname)
52 ncout.setncattr(aname, avalue)
54 ncout.setncattr(
"nlocs", np.int32(nlocsout))
56 for dim
in ncin.dimensions.values():
57 if dim.name ==
'nlocs':
61 ncout.createDimension(dim.name, d_size)
63 for var
in ncin.variables.values():
65 vdata = ncin.variables[vname]
66 var_out = ncout.createVariable(vname, var.dtype, var.dimensions)
67 if (var.dimensions[0] ==
'nlocs'):
68 var_out[...] = vdata[flag, ...]
72 for aname
in var.ncattrs():
73 avalue = var.getncattr(aname)
74 var_out.setncattr_string(aname, avalue)
78 print(
'wrote obs to:', outfile)
81 outfile = geofile[:-4]+suffix
82 ncin = nc.Dataset(geofile)
83 ncout = nc.Dataset(outfile,
'w')
85 for aname
in ncin.ncattrs():
86 avalue = ncin.getncattr(aname)
87 ncout.setncattr(aname, avalue)
89 ncout.setncattr(
"nlocs", np.int32(nlocsout))
91 for dim
in ncin.dimensions.values():
92 if dim.name ==
'nlocs':
96 ncout.createDimension(dim.name, d_size)
98 for var
in ncin.variables.values():
100 vdata = ncin.variables[vname]
101 var_out = ncout.createVariable(vname, var.dtype, var.dimensions)
102 if (var.dimensions[0] ==
'nlocs'):
103 var_out[...] = vdata[flag, ...]
105 var_out[:] = vdata[:]
109 print(
'wrote geovals to:', outfile)
113 outfile = diagfile[:-4]+suffix
114 ncin = nc.Dataset(diagfile)
115 ncout = nc.Dataset(outfile,
'w')
117 for aname
in ncin.ncattrs():
118 avalue = ncin.getncattr(aname)
119 ncout.setncattr(aname, avalue)
121 ncout.setncattr(
"nlocs", np.int32(nlocsout))
123 for dim
in ncin.dimensions.values():
124 if dim.name ==
'nlocs':
128 ncout.createDimension(dim.name, d_size)
130 for var
in ncin.variables.values():
132 vdata = ncin.variables[vname]
133 var_out = ncout.createVariable(vname, var.dtype, var.dimensions)
134 if (var.dimensions[0] ==
'nlocs'):
135 var_out[...] = vdata[flag, ...]
137 var_out[:] = vdata[:]
141 print(
'wrote obsdiag to:', outfile)
147 ap = argparse.ArgumentParser()
148 ap.add_argument(
"-m",
"--medium", action=
'store_true',
149 help=
"Subset to 5 records or 100 obs")
150 ap.add_argument(
"-s",
"--single", action=
'store_true',
151 help=
"Output single observation or record")
152 ap.add_argument(
"-g",
"--geovals", help=
"Path to geoval directory")
153 ap.add_argument(
"-d",
"--obsdiag", help=
"Path to obsdiag directory")
154 ap.add_argument(
"filedir", help=
"Path to obs files to process")
155 ap.add_argument(
"-n",
"--nprocs",
156 help=
"Number of tasks/processors for multiprocessing")
158 MyArgs = ap.parse_args()
161 nprocs =
int(MyArgs.nprocs)
172 print(
'need either -m or -s, exiting...')
175 InDir = MyArgs.filedir
177 infiles = glob.glob(InDir+
'/*.nc4')
178 obspool = Pool(processes=nprocs)
180 for infile
in infiles:
181 if os.path.getsize(infile) < 10000:
183 if infile[-6:]
in [
'_m.nc4',
'_s.nc4']:
189 inob = infile.split(
'/')[-1]
190 ingeo = inob.replace(
'obs',
'geoval')
191 geofile = MyArgs.geovals+
'/'+ingeo
195 inob = infile.split(
'/')[-1]
196 indiag = inob.replace(
'obs',
'obsdiag')
197 diagfile = MyArgs.obsdiag+
'/'+indiag
198 res = obspool.apply_async(subset, args=(infile, nobs, suffix, geofile, diagfile))
def subset(infile, nlocsout, suffix, geofile, diagfile)