IODA Bundle
Partitioner.cc
Go to the documentation of this file.
1 /*
2  * (C) Copyright 1996-2012 ECMWF.
3  *
4  * This software is licensed under the terms of the Apache Licence Version 2.0
5  * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6  * In applying this licence, ECMWF does not waive the privileges and immunities
7  * granted to it by virtue of its status as an intergovernmental organisation nor
8  * does it submit to any jurisdiction.
9  */
10 
11 #include "odc/Partitioner.h"
12 
13 #include "eckit/io/PartFileHandle.h"
14 #include "eckit/io/Offset.h"
15 #include "eckit/io/Length.h"
16 
17 #include "odc/core/MetaData.h"
18 #include "odc/Reader.h"
19 #include "odc/RowsCounter.h"
20 #include "odc/Select.h"
21 #include "odc/SelectIterator.h"
22 
23 using namespace eckit;
24 
25 namespace odc {
26 
27 Partitions Partitioner::createPartitions(const std::vector<eckit::PathName>& files, size_t numberOfPartitions)
28 {
29  std::vector<PathName> indices;
30  for (size_t i (0); i < files.size(); ++i)
31  indices.push_back(files[i] + ".idx");
32 
33  return createPartitions(files, indices, numberOfPartitions);
34 }
35 
36 Partitions Partitioner::createPartitions(const std::vector<eckit::PathName>& files, const std::vector<eckit::PathName>& indices, size_t numberOfPartitions)
37 {
38  if (numberOfPartitions > 2) --numberOfPartitions;
39 
40  Partitions parts;
41  ullong totalRowsNumber (countRows (files, indices));
42  ullong rowsPerPartition ((totalRowsNumber / numberOfPartitions));
43 
44  Log::info() << "*** createPartitions: numberOfPartitions: " << numberOfPartitions << ", totalRowsNumber: " << totalRowsNumber << ", rowsPerPartition: " << rowsPerPartition << std::endl;
45  for (size_t i(0); i < indices.size(); ++i)
46  {
47  odc::Select in("select block_begin, block_length, seqno, n_rows;", indices[i]);
48  for (odc::Select::iterator it (in.begin()), end (in.end()); it != end; ++it)
49  {
50  const ullong blockStart ((*it)[0]),
51  blockLength ((*it)[1]),
52  seqno ((*it)[2]),
53  nRows ((*it)[3]);
54 
55  parts.addReport(files[i], blockStart, blockLength, seqno, nRows, rowsPerPartition);
56 
57 /*
58  if ( parts.back().numberOfRows() + nRows <= rowsPerPartition)
59  parts.back().add(files[i], blockStart, blockLength, seqno, 0, nRows);
60  else
61  {
62  const Block& last (parts.back().blocks().back());
63  ullong firstRow ( newFile ? 0
64  : last.blockStart + last.blockLength == Offset(blockStart) + Length(blockLength)
65  ? parts.back().rowsOnLastBlock()
66  : 0 );
67 
68  parts.push_back(Partition());
69  parts.back().add(files[i], blockStart, blockLength, seqno, firstRow, nRows);
70  }
71  newFile = false;
72 */
73  }
74  }
75  return parts;
76 }
77 
78 ullong Partitioner::countRows(const std::vector<eckit::PathName>& files, const std::vector<eckit::PathName>& indices)
79 {
80  ullong n (0);
81  for (size_t i(0); i < files.size(); ++i)
82  n += RowsCounter::fastRowCount(files[i]);
83  return n;
84 }
85 
86 } // namespace odc
87 
void addReport(const eckit::PathName &fileName, ullong blockStart, ullong blockLength, ullong seqno, ullong nRows, ullong rowsPerPartition)
Definition: Partitions.cc:76
const iterator end()
Definition: Select.cc:77
iterator begin()
Definition: Select.cc:81
Definition: ColumnInfo.h:23
unsigned long long ullong
Definition: Block.h:27