IODA Bundle
MergeTool.cc
Go to the documentation of this file.
1 /*
2  * (C) Copyright 1996-2012 ECMWF.
3  *
4  * This software is licensed under the terms of the Apache Licence Version 2.0
5  * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6  * In applying this licence, ECMWF does not waive the privileges and immunities
7  * granted to it by virtue of its status as an intergovernmental organisation nor
8  * does it submit to any jurisdiction.
9  */
10 
11 #include "eckit/io/FileHandle.h"
12 #include "eckit/log/Timer.h"
13 #include "odc/Reader.h"
14 #include "odc/Select.h"
15 #include "odc/Writer.h"
16 #include "odc/tools/MergeTool.h"
17 
18 using namespace eckit;
19 using namespace odc::core;
20 
21 namespace odc {
22 namespace tool {
23 
24 void MergeTool::help(std::ostream &o) {
25  o << "Merges rows from files";
26 }
27 
28 void MergeTool::usage(const std::string& name, std::ostream &o)
29 {
30  o << name << " -o <output-file.odb> <input1.odb> <input2.odb> ..." << std::endl
31  << "" << std::endl
32  << "\t or " << std::endl
33  << "" << std::endl
34  << name << "\t -S -o <output-file.odb> <input1.odb> <sql-select1> <input2.odb> <sql-select2> ..." << std::endl;
35 }
36 
37 MergeTool::MergeTool (int ac, char *av[])
38 : Tool(ac, av),
39  inputFiles_(),
40  sql_(),
41  outputFile_(),
42  sqlFiltering_(false)
43 {
45  if (parameters().size() < 3)
46  {
47  Log::error() << "Usage:";
49  Log::error() << std::endl;
50  return;
51  }
52  sqlFiltering_ = optionIsSet("-S");
53  std::string o(optionArgument("-o", std::string("<no-default>")));
54  if (o == "<no-default>")
55  UserError("Output file is obligatory (option -o)");
56  outputFile_ = o;
57 
58  for (size_t i = 1; i < parameters().size(); ++i)
59  {
60  inputFiles_.push_back(PathName(parameters()[i]));
61  if (sqlFiltering_) {
62  std::string s(parameters()[++i]);
64  }
65  }
66 }
67 
68 
70 {
71  if (inputFiles_.size() == 0)
72  return;
73  std::stringstream s;
74  for (size_t i = 0; i < inputFiles_.size(); ++i)
75  s << inputFiles_[i] << ",";
76  Timer t(std::string("Merging files '") + s.str() + "' into '" + outputFile_ + "'");
77  if(! sqlFiltering_)
79  else
81 }
82 
83 template <typename T, typename I>
84 void doMerge(std::vector<std::pair<I, I> >& iterators, const PathName& outputFile)
85 {
86  odc::Writer<> writer(outputFile);
87  odc::Writer<>::iterator out(writer.begin());
88 
89  for (size_t i = 0; i < iterators.size(); ++i)
90  {
91  MetaData columns (iterators[i].first->columns());
92 
93  for (size_t i = 0; i < columns.size(); ++i)
94  if (out->columns().hasColumn(columns[i]->name()))
95  throw eckit::UserError(std::string("Column '") + columns[i]->name()
96  + "' occurs in more than one input file of merge.");
97  MetaData md (out->columns());
98  md += columns;
99  out->columns(md);
100  //out->columns() += columns;
101  }
102 
103  out->writeHeader();
104  Log::info() << "MergeTool::merge: output metadata: " << out->columns() << std::endl;
105 
106  for(;;)
107  {
108  for (size_t i = 0, ii = 0; ii < iterators.size(); ++ii)
109  {
110  I& in(iterators[ii].first);
111  I& inEnd(iterators[ii].second);
112  if(! (in != inEnd))
113  return (void) (Log::info() << "Input file number " << ii << " ended." << std::endl);
114 
115  for (size_t cn = 0; cn < in->columns().size(); ++cn)
116  {
117  ASSERT(i < out->columns().size());
118  out->data(i++) = (*in)[cn];
119  }
120  ++in;
121  }
122 
123  ++out;
124  }
125 }
126 
127 template <typename T>
128 struct AutoR : public std::vector<T*> { ~AutoR() { for (size_t i = 0; i < this->size(); ++i) delete this->at(i); } };
129 
130 void MergeTool::merge(const std::vector<PathName>& inputFiles, const PathName& outputFile)
131 {
132  typedef odc::Reader R;
133  typedef R::iterator I;
134 
135  AutoR<R> readers;
136  std::vector<std::pair<I, I> > iterators;
137 
138  for (size_t i = 0; i < inputFiles.size(); ++i)
139  {
140  readers.push_back(new odc::Reader(inputFiles[i]));
141  iterators.push_back(std::make_pair(readers[i]->begin(), readers[i]->end()));
142  }
143  doMerge<R, I>(iterators, outputFile);
144 }
145 
146 void MergeTool::merge(const std::vector<PathName>& inputFiles, const std::vector<std::string>& sqls, const PathName& outputFile)
147 {
148  typedef odc::Select S;
149  AutoR<S> readers;
150  AutoR<eckit::FileHandle> fhs;
151  std::vector<std::pair<S::iterator, S::iterator> > iterators;
152  for (size_t i = 0; i < inputFiles.size(); ++i)
153  {
154  FileHandle* fh = new FileHandle(inputFiles[i]);
155  fh->openForRead();
156  fhs.push_back(fh);
157  readers.push_back(new S(sqls[i], *fhs[i]));
158  iterators.push_back(std::make_pair(readers[i]->begin(), readers[i]->end()));
159  }
160  doMerge<S, S::iterator>(iterators, outputFile);
161 }
162 
163 } // namespace tool
164 } // namespace odc
165 
StringTools S
void writeHeader()
DATA * data()
Definition: IteratorProxy.h:77
const core::MetaData & columns() const
Definition: IteratorProxy.h:94
static std::string readFile(const eckit::PathName fileName, bool logging=false)
Definition: StringTool.cc:40
static bool isSelectStatement(const std::string &)
Definition: StringTool.cc:196
bool hasColumn(const std::string &) const
Definition: MetaData.cc:87
bool optionIsSet(const std::string &)
T optionArgument(const std::string &, T defaultValue)
void registerOptionWithArgument(const std::string &)
const std::vector< std::string > parameters()
static void usage(const std::string &name, std::ostream &o)
Definition: MergeTool.cc:28
std::vector< std::string > sql_
Definition: MergeTool.h:41
static void merge(const std::vector< eckit::PathName > &inputFiles, const eckit::PathName &outputFileName)
eckit::PathName outputFile_
Definition: MergeTool.h:42
std::vector< eckit::PathName > inputFiles_
Definition: MergeTool.h:40
void doMerge(std::vector< std::pair< I, I > > &iterators, const PathName &outputFile)
Definition: MergeTool.cc:84
Definition: ColumnInfo.h:23
subroutine usage()
Definition: odc_ls.f90:59