15 #include "eckit/config/Resource.h"
16 #include "eckit/utils/StringTools.h"
17 #include "eckit/io/MemoryHandle.h"
29 using namespace eckit;
35 const std::vector<std::string>& keywords,
36 const std::map<std::string,std::vector<std::string> >& request)
38 vector<string> expandedRoots, roots (eckit::StringTools::split(
":", request.at(
"odbserverroots")[0]));
39 for (
size_t i(0);
i < roots.size(); ++
i)
47 vector<string> nParts ( request.at(
"n_parts") );
48 if (nParts.size() != 1)
49 throw UserError(
string(
"N_PARTS should have one value"));
51 size_t n (atol(nParts[0].c_str()));
53 throw UserError (
"STAGE: N_PARTS should be a positive integer, was '" + nParts[0] +
"'");
59 Log::info() <<
"Creating indices for ";
60 for (
size_t i(0);
i < files.size(); ++
i)
61 Log::info() << files[
i] <<
"," << endl;
64 Indexer::createIndex(files);
65 Log::info() <<
"Created indices." << endl;
70 Log::info() <<
"Writing partitions: " << partitions << endl <<
" to files:" << endl;
72 vector<PathName> ps (partitions.
write(pathNamePrefix));
75 f.exceptions(ofstream::badbit | ofstream::failbit);
76 f.open(fileListPath.c_str());
77 for (
size_t i (0);
i < ps.size(); ++
i)
79 Log::info() <<
i <<
": " << ps[
i] << endl;
84 Log::info() <<
"List of files with partitions written to: " << fileListPath << endl;
90 const std::vector<std::string>& keywords,
91 const std::map<std::string,std::vector<std::string> >&
req)
95 Log::info() <<
"STAGE: request: " << request << endl;
100 FileMapper mapper (request.at(
"odbpathnameschema")[0]);
101 prepareMapper(mapper, keywords, request);
105 fileCollector.
findFiles(keywords, request);
107 if(devNull.estimate() == Length(0))
108 Log::userWarning() <<
"Data not found" << endl;
111 createIndices (files);
113 Partitions partitions (Partitioner::createPartitions(files, numberOfPartitions(request)));
114 size_t requestedNumberOfPartitions (numberOfPartitions(request));
116 if (partitions.size() != requestedNumberOfPartitions)
117 Log::warning() <<
"Number of partitions (" << partitions.size()
118 <<
") different than requested: " << requestedNumberOfPartitions << std::endl;
120 Log::info() <<
"Saving partitions info to " << partitionsInfoFile << endl;
121 partitions.
save(partitionsInfoFile);
123 vector<PathName> dataFiles (writePartitionsToFiles (partitions,
"odb.partition", partitionsInfoFile +
".files"));
125 sendPartitionsInfo(output, partitions);
130 Log::info() <<
"partitions: " << endl << partitions << endl;
132 DataHandle* dh =
new eckit::MemoryHandle(1024,
true);
138 md.
addColumn(
"partition_number",
"INTEGER");
139 md.
addColumn(
"number_of_rows",
"INTEGER");
143 for (
size_t i(0);
i < partitions.size(); ++
i)
void findFiles(const std::vector< std::string > &, const std::map< std::string, std::vector< std::string > > &)
std::vector< eckit::PathName > foundFilesAsPathNames() const
static std::string expandTilde(const std::string &s)
void addRoots(const std::vector< std::string > &)
static void sendPartitionsInfo(eckit::MultiHandle &, const odc::Partitions &)
static void stage(eckit::MultiHandle &output, const std::vector< std::string > &keywords, const std::map< std::string, std::vector< std::string > > &request)
static void prepareMapper(FileMapper &mapper, const std::vector< std::string > &keywords, const std::map< std::string, std::vector< std::string > > &request)
static size_t numberOfPartitions(const std::map< std::string, std::vector< std::string > > &)
static void createIndices(const std::vector< eckit::PathName > &)
static std::vector< eckit::PathName > writePartitionsToFiles(const odc::Partitions &, const std::string &pathNamePrefix, const std::string &fileListPath)
ullong numberOfRows() const
std::vector< eckit::PathName > write(const eckit::PathName &fileNamePrefix) const
void save(const eckit::PathName &)
const core::MetaData & columns() const
iterator begin(bool openDataHandle=true)
RequestDict unquoteRequestValues(const RequestDict &request)
void checkKeywordsHaveValues(const RequestDict &request, const std::vector< std::string > &keywords)
std::map< std::string, std::vector< std::string > > RequestDict
real(kind_real), parameter, public req
Earth radius at equator (m)