10 from datetime
import datetime
12 import multiprocessing
21 OLD_SUBSET_PATTERN =
r'NC0050([1245678]\d|90)'
22 NEW_SUBSET_PATTERN =
r'NC0050(3\d|91)'
25 EXE_DIR = pathlib.Path(__file__).parent.absolute()
26 OLD_SUBSET_TEMPLATE_PATH = os.path.join(EXE_DIR,
27 'satwnds_old_subset_template.yaml')
28 NEW_SUBSET_TEMPLATE_PATH = os.path.join(EXE_DIR,
29 'satwnds_new_subset_template.yaml')
33 tag_re = re.compile(
r'{{\s*(?P<key>\w+)\s*}}')
36 with open(template_path,
'r')
as template_file:
37 for line
in template_file.readlines():
38 matches = tag_re.findall(line)
39 for match_key
in matches:
40 if match_key
in replacements:
41 line = tag_re.sub(replacements[match_key], line)
43 raise Exception(f
'Unknown tag with key {match_key} in \
47 with open(output_path,
'w')
as new_file:
48 new_file.writelines(lines)
52 file_name = os.path.split(path)[1]
53 print(f
'Running {file_name}.')
55 if re.match(OLD_SUBSET_PATTERN, file_name):
56 yaml_template = OLD_SUBSET_TEMPLATE_PATH
57 elif re.match(NEW_SUBSET_PATTERN, file_name):
58 yaml_template = NEW_SUBSET_TEMPLATE_PATH
60 print(f
'Warning: Found undefined subset {file_name}')
63 yaml_out_path = f
'{file_name}.yaml'
65 {
'obsdatain': file_name,
66 'obsdataout': f
'{file_name}.nc'},
69 subprocess.call(f
'bufr2ioda.x {yaml_out_path}', shell=
True)
72 os.remove(yaml_out_path)
75 def run(bufr_path, num_threads, output_dir):
77 Splits a Sat Winds file into its subset components and runs bufr2ioda on
79 :param bufr_path: Path to the Sat winds Bufr file.
80 :param num_threads: Number of concurrent converters.
81 :param output_dir: Directory were to place the result files
84 def _set_up_working_dir(out_dir):
85 timestamp_str = datetime.now().strftime(
"%Y%m%d%H%M%S")
90 working_dir = f
'satwnd_processing_{timestamp_str}'
92 if os.path.exists(working_dir):
93 shutil.rmtree(working_dir)
98 def _clean_working_dir(bufr_paths):
99 for path
in bufr_paths:
102 input_path = os.path.realpath(bufr_path)
104 _set_up_working_dir(output_dir)
107 subprocess.call(f
'split_by_subset.x {input_path}', shell=
True)
110 bufr_paths = glob.glob(
'NC*')
111 pool = multiprocessing.Pool(num_threads)
112 pool.map(_process_bufr_path, bufr_paths)
115 _clean_working_dir(bufr_paths)
118 if __name__ ==
'__main__':
119 DESCRIPTION =
'Split a sat wind bufr file into its subsets, and runs \
120 bufr2ioda.x on each one with the proper configuration.'
122 parser = argparse.ArgumentParser(description=DESCRIPTION)
123 parser.add_argument(
'file',
125 help=
"SatWnd file to split.")
127 parser.add_argument(
'-t',
131 help=
"Number of concurrent instances of bufr2ioda.")
133 parser.add_argument(
'-o',
137 help=
"Directory where to put the resulting netcdf files.")
139 args = parser.parse_args()
141 start_time = datetime.now()
142 run(args.file, args.threads, args.output_dir)
143 print((datetime.now() - start_time).total_seconds())
def _make_file_from_template(template_path, replacements, output_path)
def _process_bufr_path(path)
def run(bufr_path, num_threads, output_dir)