IODA Bundle
Stager.cc
Go to the documentation of this file.
1 /*
2  * (C) Copyright 1996-2013 ECMWF.
3  *
4  * This software is licensed under the terms of the Apache Licence Version 2.0
5  * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6  * In applying this licence, ECMWF does not waive the privileges and immunities
7  * granted to it by virtue of its status as an intergovernmental organisation nor
8  * does it submit to any jurisdiction.
9  */
10 
11 
12 #include <ctype.h>
13 #include <fstream>
14 
15 #include "eckit/config/Resource.h"
16 #include "eckit/utils/StringTools.h"
17 #include "eckit/io/MemoryHandle.h"
18 
19 #include "odc/FileCollector.h"
20 #include "odc/FileMapper.h"
21 #include "odc/Partition.h"
22 #include "odc/Indexer.h"
23 #include "odc/Partitioner.h"
24 #include "odc/Writer.h"
25 #include "odc/Stager.h"
26 #include "odc/RequestUtils.h"
27 
28 using namespace std;
29 using namespace eckit;
30 using namespace odc;
31 using namespace odc::tool;
32 using namespace odc::core;
33 
35  const std::vector<std::string>& keywords,
36  const std::map<std::string,std::vector<std::string> >& request)
37 {
38  vector<string> expandedRoots, roots (eckit::StringTools::split(":", request.at("odbserverroots")[0]));
39  for (size_t i(0); i < roots.size(); ++i)
40  expandedRoots.push_back(FileCollector::expandTilde(roots[i]));
41  mapper.addRoots(expandedRoots);
42  mapper.checkRoots();
43 }
44 
45 size_t Stager::numberOfPartitions(const std::map<std::string,std::vector<std::string> >& request)
46 {
47  vector<string> nParts ( request.at("n_parts") );
48  if (nParts.size() != 1)
49  throw UserError(string("N_PARTS should have one value"));
50 
51  size_t n (atol(nParts[0].c_str()));
52  if (n == 0)
53  throw UserError ("STAGE: N_PARTS should be a positive integer, was '" + nParts[0] + "'");
54  return n;
55 }
56 
57 void Stager::createIndices(const std::vector<eckit::PathName>& files)
58 {
59  Log::info() << "Creating indices for ";
60  for (size_t i(0); i < files.size(); ++i)
61  Log::info() << files[i] << "," << endl;
62  Log::info() << endl;
63 
64  Indexer::createIndex(files); // for now for testing, indexing will be done some time earlier
65  Log::info() << "Created indices." << endl;
66 }
67 
68 vector<PathName> Stager::writePartitionsToFiles (const Partitions& partitions, const string& pathNamePrefix, const string& fileListPath)
69 {
70  Log::info() << "Writing partitions: " << partitions << endl << " to files:" << endl;
71 
72  vector<PathName> ps (partitions.write(pathNamePrefix));
73 
74  ofstream f;
75  f.exceptions(ofstream::badbit | ofstream::failbit);
76  f.open(fileListPath.c_str());
77  for (size_t i (0); i < ps.size(); ++i)
78  {
79  Log::info() << i << ": " << ps[i] << endl;
80  f << ps[i] << endl;
81  }
82  f.close();
83 
84  Log::info() << "List of files with partitions written to: " << fileListPath << endl;
85 
86  return ps;
87 }
88 
89 void Stager::stage(eckit::MultiHandle& output,
90  const std::vector<std::string>& keywords,
91  const std::map<std::string,std::vector<std::string> >& req)
92 {
94 
95  Log::info() << "STAGE: request: " << request << endl;
96 
97  odc::checkKeywordsHaveValues(request, keywords);
98  const string partitionsInfoFile (FileCollector::expandTilde(request.at("partitionsinfo")[0]));
99 
100  FileMapper mapper (request.at("odbpathnameschema")[0]);
101  prepareMapper(mapper, keywords, request);
102 
103  MultiHandle devNull;
104  FileCollector fileCollector (mapper, devNull);
105  fileCollector.findFiles(keywords, request);
106 
107  if(devNull.estimate() == Length(0))
108  Log::userWarning() << "Data not found" << endl;
109 
110  vector<PathName> files (fileCollector.foundFilesAsPathNames());
111  createIndices (files);
112 
113  Partitions partitions (Partitioner::createPartitions(files, numberOfPartitions(request)));
114  size_t requestedNumberOfPartitions (numberOfPartitions(request));
115 
116  if (partitions.size() != requestedNumberOfPartitions)
117  Log::warning() << "Number of partitions (" << partitions.size()
118  << ") different than requested: " << requestedNumberOfPartitions << std::endl;
119 
120  Log::info() << "Saving partitions info to " << partitionsInfoFile << endl;
121  partitions.save(partitionsInfoFile);
122 
123  vector<PathName> dataFiles (writePartitionsToFiles (partitions, "odb.partition", partitionsInfoFile + ".files"));
124 
125  sendPartitionsInfo(output, partitions);
126 }
127 
128 void Stager::sendPartitionsInfo(MultiHandle& output, const Partitions& partitions)
129 {
130  Log::info() << "partitions: " << endl << partitions << endl;
131 
132  DataHandle* dh = new eckit::MemoryHandle(1024, true);
133  dh->openForWrite(0);
134  Writer<> out(*dh);
135  Writer<>::iterator o (out.begin());
136  MetaData md(o->columns());
137 
138  md.addColumn("partition_number", "INTEGER");
139  md.addColumn("number_of_rows", "INTEGER");
140  o->columns(md);
141  o->writeHeader();
142 
143  for (size_t i(0); i < partitions.size(); ++i)
144  {
145  const Partition& p(partitions[i]);
146  (*o)[0] = i;
147  (*o)[1] = p.numberOfRows();
148  ++o;
149  }
150 
151  dh->openForRead();
152  output += dh;
153 }
154 
void findFiles(const std::vector< std::string > &, const std::map< std::string, std::vector< std::string > > &)
std::vector< eckit::PathName > foundFilesAsPathNames() const
static std::string expandTilde(const std::string &s)
void checkRoots() const
Definition: FileMapper.cc:39
void addRoots(const std::vector< std::string > &)
Definition: FileMapper.cc:106
static void sendPartitionsInfo(eckit::MultiHandle &, const odc::Partitions &)
Definition: Stager.cc:128
static void stage(eckit::MultiHandle &output, const std::vector< std::string > &keywords, const std::map< std::string, std::vector< std::string > > &request)
Definition: Stager.cc:89
static void prepareMapper(FileMapper &mapper, const std::vector< std::string > &keywords, const std::map< std::string, std::vector< std::string > > &request)
Definition: Stager.cc:34
static size_t numberOfPartitions(const std::map< std::string, std::vector< std::string > > &)
Definition: Stager.cc:45
static void createIndices(const std::vector< eckit::PathName > &)
Definition: Stager.cc:57
static std::vector< eckit::PathName > writePartitionsToFiles(const odc::Partitions &, const std::string &pathNamePrefix, const std::string &fileListPath)
Definition: Stager.cc:68
ullong numberOfRows() const
Definition: Partition.cc:98
std::vector< eckit::PathName > write(const eckit::PathName &fileNamePrefix) const
Definition: Partitions.cc:37
void save(const eckit::PathName &)
Definition: Partitions.cc:58
void writeHeader()
const core::MetaData & columns() const
Definition: IteratorProxy.h:94
iterator begin(bool openDataHandle=true)
Definition: Writer.cc:92
MetaData & addColumn(const std::string &name, const std::string &type)
Definition: MetaData.cc:279
Definition: ColumnInfo.h:23
RequestDict unquoteRequestValues(const RequestDict &request)
Definition: RequestUtils.cc:26
void checkKeywordsHaveValues(const RequestDict &request, const std::vector< std::string > &keywords)
Definition: RequestUtils.cc:38
std::map< std::string, std::vector< std::string > > RequestDict
Definition: RequestUtils.h:27
real(kind_real), parameter, public req
Earth radius at equator (m)
Definition: encode.cc:30