2.12.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

sensorcache.cpp 5.9 KB
Newer Older
1
2
3
//================================================================================
// Name        : sensorcache.cpp
// Author      : Michael Ott
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description :
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2016-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
Alessio Netti's avatar
Alessio Netti committed
27
28
29
30
31

#include "sensorcache.h"

SensorCache::SensorCache(uint64_t maxHistory) {
    this->_maxHistory = maxHistory;
32
    this->_updating.store(false);
33
    this->_access.store(0);
Alessio Netti's avatar
Alessio Netti committed
34
35
36
37
38
39
}

SensorCache::~SensorCache() {
    sensorCache.clear();
}

40
sensorCache_t& SensorCache::getSensorMap() {
Alessio Netti's avatar
Alessio Netti committed
41
42
43
44
45
46
47
    return sensorCache;
}

void SensorCache::storeSensor(SensorId sid, uint64_t ts, int64_t val) {
  reading_t s = { val, ts };
  /* Remove the reserved bytes to leverage the standard find function */
  sid.setRsvd(0);
48
49
50
  wait();
  auto sIt = sensorCache.find(sid);
  if(sIt!=sensorCache.end()) {
51
      sIt->second.store(s);
52
53
54
55
56
57
58
      release();
  } else {
      release();
      // Spinning on the lock
      while (_updating.exchange(true)) {}
      while(_access.load()>0) {}
      sensorCache[sid] = CacheEntry(_maxHistory);
59
      sensorCache[sid].store(s);
60
      _updating.store(false);
61
  }
Alessio Netti's avatar
Alessio Netti committed
62
63
}

64
65
66
67
void SensorCache::storeSensor(const SensorDataStoreReading& s) {
    storeSensor(s.sensorId, s.timeStamp.getRaw(), s.value);
}

Alessio Netti's avatar
Alessio Netti committed
68
69
70
int64_t SensorCache::getSensor(SensorId sid, uint64_t avg) {
  /* Remove the reserved bytes to leverage the standard find function */
  sid.setRsvd(0);
71
  wait();
Alessio Netti's avatar
Alessio Netti committed
72
73
74
  sensorCache_t::iterator it = sensorCache.find(sid);

  if (it == sensorCache.end()) {
75
    release();
Alessio Netti's avatar
Alessio Netti committed
76
77
78
79
    throw std::invalid_argument("Sid not found");
  }

  if (!it->second.checkValid())
80
81
  { 
    release();
Alessio Netti's avatar
Alessio Netti committed
82
83
84
    throw std::out_of_range("Sid outdated");
  }

85
86
87
  int64_t val = avg ? it->second.getAverage(avg) : it->second.getLatest().value;
  release();
  return val;
Alessio Netti's avatar
Alessio Netti committed
88
89
}

90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Wildcards are not supported with string topics
//int64_t SensorCache::getSensor(std::string topic, uint64_t avg) {
//  topic.erase(std::remove(topic.begin(), topic.end(), '/'), topic.end());
//
//  size_t wp = topic.find("*");
//  if (wp == std::string::npos) {
//    return getSensor(DCDB::SensorId(topic), avg);
//  }
//
//  int wl = 29 - topic.length();
// 
//  /* Create SensorIds with the lowest and highest values matching the wildcard */
//  DCDB::SensorId sidLow(std::string(topic).replace(wp, 1, std::string(wl, '0')));
//  DCDB::SensorId sidHi(std::string(topic).replace(wp, 1, std::string(wl, 'f')));
//  DCDB::SensorId sidMask(std::string(wp, 'f') + std::string(wl, '0') + std::string(topic.length()-wp-1, 'f'));
//  sidLow.setRsvd(0);
//  sidHi.setRsvd(0);
//
//  /* See whether there's a SensorId in the cache >= sidLow */
//  sensorCache_t::iterator it = sensorCache.lower_bound(sidLow);
//  sensorCache_t::iterator mostRecentSidIt = sensorCache.end();
//
//  bool foundOne = false;
//  /* Iterate over the cache until the current entry is > sidHi */
//  while ((it != sensorCache.end()) && (it->first <= sidHi)) {
//    if ((it->first & sidMask) == sidLow) {
//      foundOne = true;
//      /* We only return one value, even if multiple SensorIds would match.
//       * At least make sure it's the most recent value
//       */
//      if (it->second.checkValid() && ((mostRecentSidIt == sensorCache.end()) || mostRecentSidIt->second.getLatest().timestamp < it->second.getLatest().timestamp)) {
//        mostRecentSidIt = it;
//      }
//    }
//    it++;
//  }
//
//  if (mostRecentSidIt == sensorCache.end()) {
//    /* Check whether we actually found at least an outdated entry */
//    if (foundOne) {
//      throw std::out_of_range("Sid outdated");
//    } else {
//      throw std::invalid_argument("Sid not found");
//    }
//  }
//
//  if (avg) {
//    return mostRecentSidIt->second.getAverage(avg);
//  } else {
//    return mostRecentSidIt->second.getLatest().value;
//  }
//}
Alessio Netti's avatar
Alessio Netti committed
142
143
144
145

void SensorCache::dump() {
  std::cout << "SensorCache Dump:" << std::endl;
  for (sensorCache_t::iterator sit = sensorCache.begin(); sit != sensorCache.end(); sit++) {
146
    std::cout << "  id=" << sit->first.getId() << " data=[";
Alessio Netti's avatar
Alessio Netti committed
147
148
149
150
151
152
153
154
155
156
157
158
159
    for (std::vector<reading_t>::const_iterator eit = sit->second.getRaw()->begin(); eit != sit->second.getRaw()->end(); eit++) {
      if (eit != sit->second.getRaw()->begin()) {
        std::cout << ",";
      }
      std::cout << "(" <<  eit->value << "," << eit->timestamp/NS_PER_S << "." << std::setfill ('0') << std::setw (9) << eit->timestamp%NS_PER_S << ")";
    }
    std::cout << "]" << std::endl;
  }
}

uint64_t SensorCache::clean(uint64_t t) {
    uint64_t thresh = getTimestamp() - t;
    uint64_t ctr = 0;
160
161
    // Spinning on the lock
    while (_updating.exchange(true)) {}
162
    while(_access.load()>0) {}
Alessio Netti's avatar
Alessio Netti committed
163
164
165
166
167
168
169
170
    for (auto it = sensorCache.cbegin(); it != sensorCache.cend();) {
        uint64_t latestTs = it->second.getLatest().timestamp;
        if (latestTs!=0 && latestTs < thresh) {
            it = sensorCache.erase(it);
            ctr++;
        } else
            ++it;
    }
171
    _updating.store(false);
Alessio Netti's avatar
Alessio Netti committed
172
173
    return ctr;
}