Commit 233b4768 authored by Alessio Netti's avatar Alessio Netti

Analytics: minor changes to timing of queries

- Job Operator job queries now use the timestamp for the next scheduled
invocation to define their time range
- Additional timestamp checks in PersystSQL
parent b57cedf4
......@@ -289,7 +289,9 @@ protected:
virtual void computeAsync() override {
try {
_jobDataVec.clear();
if(this->_queryEngine.queryJob("", this->_interval * 1000000, 0, _jobDataVec, true, true)) {
uint64_t queryTsEnd = !this->_scheduledTime ? getTimestamp() : this->_scheduledTime;
uint64_t queryTsStart = this->_scheduledTime - (this->_interval * 1000000);
if(this->_queryEngine.queryJob("", queryTsStart, queryTsEnd, _jobDataVec, false, true)) {
_tempUnits.clear();
// Producing units from the job data, discarding invalid jobs in the process
for(auto& job : _jobDataVec) {
......@@ -329,7 +331,8 @@ protected:
}
if (this->_timer && this->_keepRunning) {
this->_timer->expires_at(timestamp2ptime(this->nextReadingTime()));
this->_scheduledTime = this->nextReadingTime();
this->_timer->expires_at(timestamp2ptime(this->_scheduledTime));
this->_pendingTasks++;
this->_timer->async_wait(bind(&JobOperatorTemplate::computeAsync, this));
}
......
......@@ -72,7 +72,8 @@ public:
OperatorInterface(name),
_unitCache(nullptr),
_insertionLUT(nullptr),
_queryEngine(QueryEngine::getInstance()) {}
_queryEngine(QueryEngine::getInstance()),
_scheduledTime(0) {}
/**
* @brief Copy constructor
......@@ -86,7 +87,8 @@ public:
OperatorInterface(other),
_unitCache(nullptr),
_insertionLUT(nullptr),
_queryEngine(QueryEngine::getInstance()) {
_queryEngine(QueryEngine::getInstance()),
_scheduledTime(0) {
for(auto u : other._units) {
_units.push_back(u);
......@@ -103,6 +105,7 @@ public:
OperatorTemplate& operator=(const OperatorTemplate& other) {
OperatorInterface::operator=(other);
_units.clear();
_scheduledTime = 0;
for(auto u : other._units) {
_units.push_back(u);
......@@ -510,7 +513,8 @@ protected:
}
if (_timer && _keepRunning && !_disabled) {
_timer->expires_at(timestamp2ptime(nextReadingTime()));
_scheduledTime = nextReadingTime();
_timer->expires_at(timestamp2ptime(_scheduledTime));
_pendingTasks++;
_timer->async_wait(bind(&OperatorTemplate::computeAsync, this));
}
......@@ -538,6 +542,8 @@ protected:
vector<UnitPtr> _baseUnits;
// Instance of a QueryEngine object to get sensor data
QueryEngine& _queryEngine;
// Internal timestamp to keep track of the time at which an operator will be invoked
uint64_t _scheduledTime;
};
#endif //PROJECT_OPERATORTEMPLATE_H
......@@ -147,6 +147,14 @@ void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
size_t elCtr = 0;
uint64_t tolerance_ms = (uint64_t)_interval * 3 / 2;
uint64_t my_timestamp = getTimestamp() - _go_back_ns;
// Too early to fetch data for the job
if(my_timestamp < jobData.startTime)
return;
// We snap the timestamp to the job's end time if outside of its boundaries
else if(jobData.endTime!=0 && my_timestamp > jobData.endTime)
my_timestamp = jobData.endTime;
// Job units are hierarchical, and thus we iterate over all sub-units associated to each single node
std::vector<std::string> vectorOfSensorNames;
for (const auto& subUnit : unit->getSubUnits()) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment