UFO
StuckCheck.cc
Go to the documentation of this file.
1 /*
2  * (C) Copyright 2021 Met Office UK
3  *
4  * This software is licensed under the terms of the Apache Licence Version 2.0
5  * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
6  */
7 
9 
10 #include <memory>
11 #include <ostream>
12 #include <stdexcept>
13 #include <string>
14 #include <vector>
15 
16 #include <boost/none.hpp>
17 #include "eckit/config/Configuration.h"
18 #include "ioda/ObsDataVector.h"
19 #include "ioda/ObsSpace.h"
20 #include "oops/base/Variables.h"
21 #include "oops/util/DateTime.h"
22 #include "oops/util/Duration.h"
23 #include "oops/util/Logger.h"
28 
29 namespace ufo {
30 
31 StuckCheck::StuckCheck(ioda::ObsSpace &obsdb, const Parameters_ &parameters,
32  std::shared_ptr<ioda::ObsDataVector<int> > flags,
33  std::shared_ptr<ioda::ObsDataVector<float> > obserr)
34  : FilterBase(obsdb, parameters, flags, obserr), options_(parameters)
35 {
36  obsGroupDateTimes_.reset(new std::vector<util::DateTime>);
37  oops::Log::debug() << "StuckCheck: config = " << options_ << '\n';
38 }
39 
40 // Required for the correct destruction of ObsGroupDateTimes_.
42 {}
43 
44 /// The filter removes observations if they are part of a 'streak'. A streak is where the number
45 /// of identical observation values in sequence (for a given variable) is greater than a user
46 /// defined count.
47 /// To be a streak, it must also continue for longer than a user-defined duration or every
48 /// observation in the station's group must have an identical value.
49 void StuckCheck::applyFilter(const std::vector<bool> & apply,
50  const Variables & filtervars,
51  std::vector<std::vector<bool>> & flagged) const {
53  const std::vector<size_t> validObsIds = obsAccessor.getValidObservationIds(apply);
55  "MetaData", "datetime");
56  // Create groups based on record number (assumed station ID) or category variable
57  // (stationIdVariable) or otherwise assume observations all taken by the same station (1 group)
58  RecursiveSplitter splitter = obsAccessor.splitObservationsIntoIndependentGroups(validObsIds);
59  TrackCheckUtils::sortTracksChronologically(validObsIds, obsAccessor, splitter);
60  std::vector<bool> isRejected(validObsIds.size(), false);
61  std::vector<std::string> filterVariables = filtervars.toOopsVariables().variables();
62  // Iterates through observations to see how long each variable is stuck on one observation
63  for (std::string const& variable : filterVariables) {
64  size_t stationNumber = 0;
65  if (!obsdb_.has("ObsValue", variable)) {
66  std::string errorMessage =
67  "StuckCheck Error: ObsValue vector for " + variable + " not found.\n";
68  throw std::invalid_argument(errorMessage);
69  }
70  const std::vector<float> variableValues = obsAccessor.getFloatVariableFromObsSpace(
71  "ObsValue", variable);
72  for (auto station : splitter.multiElementGroups()) {
73  std::string stationId = std::to_string(stationNumber);
74  std::vector<float> variableDataStation = collectStationVariableData(
75  station.begin(), station.end(), validObsIds, variableValues);
76  // the working variable's value associated with the prior observation
77  float previousObservationValue;
78  float currentObservationValue;
79  size_t firstSameValueIndex = 0; // the first observation in the current streak
80  for (size_t observationIndex = 0; observationIndex < variableDataStation.size();
81  observationIndex++) {
82  currentObservationValue = variableDataStation.at(observationIndex);
83  if (observationIndex == 0) {
84  previousObservationValue = currentObservationValue;
85  } else {
86  if (currentObservationValue == previousObservationValue) {
87  // If the last observation of the track is part of a streak, the full streak will need
88  // to be checked at this point.
89  if (observationIndex == variableDataStation.size() - 1) {
91  station.end(),
92  validObsIds,
93  firstSameValueIndex,
94  observationIndex,
95  isRejected,
96  stationId);
97  }
98  } else { // streak ended in the previous observation
100  station.end(),
101  validObsIds,
102  firstSameValueIndex,
103  observationIndex - 1,
104  isRejected,
105  stationId);
106  // start the streak with the current observation and reset the count to 1
107  firstSameValueIndex = observationIndex;
108  previousObservationValue = currentObservationValue;
109  }
110  }
111  }
112  stationNumber++;
113  }
114  }
115  obsAccessor.flagRejectedObservations(isRejected, flagged);
116 }
117 
118 void StuckCheck::print(std::ostream & os) const {
119  os << "StuckCheck: config = " << config_ << '\n';
120 }
121 
122 /// \returns a vector containing all of the necessary data to run this filter for each observation,
123 /// stored by observation.
125  std::vector<size_t>::const_iterator stationObsIndicesBegin,
126  std::vector<size_t>::const_iterator stationObsIndicesEnd,
127  const std::vector<size_t> &validObsIds,
128  const std::vector<float> &globalData) const {
129  std::vector<float> stationData;
130  stationData.reserve(stationObsIndicesEnd - stationObsIndicesBegin);
131  size_t observationNumber = 0;
132  for (std::vector<size_t>::const_iterator it = stationObsIndicesBegin;
133  it != stationObsIndicesEnd; ++it) {
134  const size_t obsId = validObsIds.at(*it);
135  stationData.push_back(globalData[obsId]);
136  observationNumber++;
137  }
138  return stationData;
139 }
140 
142  std::vector<size_t>::const_iterator stationIndicesBegin,
143  std::vector<size_t>::const_iterator stationIndicesEnd,
144  const std::vector<size_t> &validObsIds,
145  size_t startOfStreakIndex,
146  size_t endOfStreakIndex,
147  std::vector<bool> &isRejected,
148  std::string stationId = "") const {
149 
150  auto getObservationTime = [this, &stationIndicesBegin, &validObsIds] (
151  size_t offsetFromBeginning)->util::DateTime{
152  const size_t obsIndex = validObsIds.at(*(stationIndicesBegin + offsetFromBeginning));
153  return obsGroupDateTimes_->at(obsIndex);
154  };
155 
156  auto rejectObservation = [&validObsIds, &isRejected, &stationIndicesBegin, &stationId](
157  size_t observationIndex) {
158  const size_t obsIndex = validObsIds.at(*(stationIndicesBegin + observationIndex));
159  isRejected[obsIndex] = true;
160  };
161 
162  size_t streakLength = endOfStreakIndex - startOfStreakIndex + 1;
163  if (streakLength <= options_.core.numberStuckTolerance) {
164  return;
165  }
166 
167  size_t stationLength = stationIndicesEnd - stationIndicesBegin;
168 
169  if (streakLength < stationLength) {
170  util::DateTime firstStreakObservationTime = getObservationTime(startOfStreakIndex);
171  util::DateTime lastStreakObservationTime = getObservationTime(endOfStreakIndex);
172  util::Duration streakDuration = lastStreakObservationTime - firstStreakObservationTime;
173  if (streakDuration <= options_.core.timeStuckTolerance) {
174  return;
175  }
176  }
177  // reject all observations within streak
178  for (size_t indexToReject = startOfStreakIndex;
179  indexToReject <= endOfStreakIndex;
180  indexToReject++)
181  rejectObservation(indexToReject);
182 }
183 
184 } // namespace ufo
Base class for UFO QC filters.
Definition: FilterBase.h:45
const eckit::LocalConfiguration config_
Definition: FilterBase.h:59
This class provides access to observations that may be held on multiple MPI ranks.
Definition: ObsAccessor.h:56
RecursiveSplitter splitObservationsIntoIndependentGroups(const std::vector< size_t > &validObsIds, bool opsCompatibilityMode=false) const
Definition: ObsAccessor.cc:197
std::vector< size_t > getValidObservationIds(const std::vector< bool > &apply, const ioda::ObsDataVector< int > &flags, const Variables &filtervars, bool validIfAnyFilterVariablePassedQC=true) const
Return the IDs of observation locations that should be treated as valid by a filter.
Definition: ObsAccessor.cc:99
std::vector< float > getFloatVariableFromObsSpace(const std::string &group, const std::string &variable) const
Definition: ObsAccessor.cc:144
std::vector< util::DateTime > getDateTimeVariableFromObsSpace(const std::string &group, const std::string &variable) const
Definition: ObsAccessor.cc:159
void flagRejectedObservations(const std::vector< bool > &isRejected, std::vector< std::vector< bool > > &flagged) const
Update flags of observations held on the current MPI rank.
Definition: ObsAccessor.cc:243
ioda::ObsSpace & obsdb_
Partitions an array into groups of elements equivalent according to certain criteria.
MultiElementGroupRange multiElementGroups() const
Return the range of equivalence classes consisting of more than one element.
oops::RequiredParameter< util::Duration > timeStuckTolerance
oops::RequiredParameter< size_t > numberStuckTolerance
std::unique_ptr< std::vector< util::DateTime > > obsGroupDateTimes_
StuckCheck(ioda::ObsSpace &obsdb, const Parameters_ &parameters, std::shared_ptr< ioda::ObsDataVector< int > > flags, std::shared_ptr< ioda::ObsDataVector< float > > obserr)
Definition: StuckCheck.cc:31
~StuckCheck() override
Definition: StuckCheck.cc:41
std::vector< float > collectStationVariableData(std::vector< size_t >::const_iterator stationObsIndicesBegin, std::vector< size_t >::const_iterator stationObsIndicesEnd, const std::vector< size_t > &validObsIds, const std::vector< float > &globalData) const
Definition: StuckCheck.cc:124
void potentiallyRejectStreak(std::vector< size_t >::const_iterator stationIndicesBegin, std::vector< size_t >::const_iterator stationIndicesEnd, const std::vector< size_t > &validObsIds, size_t startOfStreakIndex, size_t endOfStreakIndex, std::vector< bool > &isRejected, std::string stationId) const
Definition: StuckCheck.cc:141
void applyFilter(const std::vector< bool > &, const Variables &, std::vector< std::vector< bool >> &) const override
Definition: StuckCheck.cc:49
void print(std::ostream &) const override
Definition: StuckCheck.cc:118
Options controlling the operation of stuck check filter.
StuckCheckCoreParameters core
oops::OptionalParameter< Variable > stationIdVariable
oops::Variables toOopsVariables() const
Definition: Variables.cc:156
ObsAccessor createObsAccessor(const boost::optional< Variable > &stationIdVariable, const ioda::ObsSpace &obsdb)
Create an ObsAccessor object providing access to observations that need to be checked by the current ...
void sortTracksChronologically(const std::vector< size_t > &validObsIds, const ObsAccessor &obsAccessor, RecursiveSplitter &splitter)
Definition: RunCRTM.h:27