Kapacitor data aggregation does not save aggregated data for all series

As discussed on the #influxdb channel on slack, here’s my complete write-up on the issue I’m having.

Our set-up

I have one large measurement in my database, called log_record. This measurement stores sensor values for a lot of sensors. There are two retention policies:

  • four_weeks; contains four weeks of “raw” sensor data, about 10s interval
  • forever; contains averages over 2 minutes and keeps that data forever

Every measurement in theses retention policies is tagged with a “sensor_id” which is the identifier of the meta-data on this sensor which we store in a mysql database.

Data in the forever retention policy are averages which are calculated based on the data in the four_weeks retention policy.

Continuous query

So first, to calculate the averages which are stored in the forever retention policy, I used a continuous query, like this:

CREATE CONTINUOUS QUERY cq_aggregate_log_record ON mydb 
BEGIN 
    SELECT mean(value) AS value 
    INTO mydb.forever.log_record 
    FROM mydb.four_weeks.log_record 
    GROUP BY time(2m), * 
END

This worked for a while until I found out that not all sensor data was correctly being averaged, actually, a lot of sensors did not receive averages at all, even though their data was certainly present in the four_weeks retention policy.

After trying to fix this continuous query, but failing, I gave up and switched to a script that was called periodically every 2 minutes, containing this query:

SELECT MEAN(value) AS value
INTO mydb.forever.log_record
FROM mydb.four_weeks.log_record
WHERE time > {{{ ten minutes ago calculated by my script }}}}
GROUP BY time(2m), *

This worked like a charm and correctly calculated all averages for all sensors. The continuous query issue was never solved, but alas, at least it worked using my script.

Kapacitor

However, recently we started using kapacitor for alerting, and I found out that kapacitor was also good at being a continuous query engine. So I set about to build a kapacitor task that would aggregate my sensor data, so I could finally remove my dirty script.

Batch task

I found this page and altered it a bit, to come to this:

batch
|query('SELECT mean(value) as value from "mydb"."four_weeks"."log_record"')
    .period(10m)
    .every(2m)
    .groupBy(time(2m), *)
    .align()
    .fill('none')
|influxDBOut()
    .database('mydb')
    .retentionPolicy('forever')
    .measurement('log_record')
    .precision('s')

This worked, until I found out that the exact same issue was occuring as I had with the Continuous Query I previously tried: a lot of my sensor data was just not being averaged.

So I tried the stream example instead, but it behaved exactly the same: not all sensor data was being averaged.

When testing the stream task by recording a query and replaying that data over the task, I found out though that InfluxDB was correctly returning all averages for all my sensors. This is a gist of the response I got from InfluxDB when CURLing the query.

Note that all data up until line 2414 are correctly being written back to InfluxDB by kapacitor, using the InfluxDBOut node. All data after line 2414 is not being written back.
Nothing seems to be wrong with the data on line 2141 though…

I am completely at a loss where this issue might be coming from. Please @nathaniel, do you have any idea why this is happening?

Nothing jumps out as the immediate cause so lets jump in.

Somethings that initially seemed odd to me:

  • How is data being written to InfluxDB/Kapacitor? Is it possible that data arrives late and therefore misses the cutoff for the CQ? This could explain why the stream task didn’t work as well as all the other methods. No other issue would seem to effect stream as well as batch.
  • The batch query you are performing in the TICKscript creates overlapping windows.
    Specifically every 2m you are querying the last 10m of data grouped by 2m.
    So every 2m you query 5 points out of InfluxDB and write them back to using Kapacitor.
    Since this is being done every 2m and each point represents 2m of time, 4 out of the 5 points are redundant.
    Is this intentional? If so why?
  • My initial guess is that whatever caused the CQ to fail also causes Kapacitor to fail. Can you confirm that Kapacitor received all of the data? This can be done by checking the stats on the query node via the kapacitor show task_name command. This way we can narrow down the issue to either the InfluxDB or Kapacitor side.

Sorry if some of these question where already answered in the slack discussion, but I think documenting their answers here will help capture all the relevant info.

This was an issue, because we had a few clients that were structurally posting data with a timestamp of now-5m, because of out of sync clocks. However I solved that by using period(10m), which to my knowledge always looks back a period of 10 minutes.

[quote]
How is data being written to InfluxDB/Kapacitor? Is it possible that data arrives late and therefore misses the cutoff for the CQ? This could explain why the stream task didn’t work as well as all the other methods. No other issue would seem to effect stream as well as batch.[/quote]

So yes, this is intentional because of some out of sync clocks logging “back in time”.

I’m not sure whether kapacitor is correctly parsing the results that InfluxDB is returning for the query, but I am sure that InfluxDB is returning all desired results.

I am reproducing the bug using kapacitor record and kapacitor replay, but as far as I can see those tasks are not affecting the stats kept by kapacitor for each task. Is there a possibility to see the amount of parsed points for a replay?

So as an addition to the above we ran a few more tests with different values for period and every. We recorded 20 minutes of data in kapacitor for each of these sets of parameters and replayed the task against those 20 minutes of data. Then we checked how many points were written out by kapacitor to the forever retention policy by InfluxDBOut. The following table contains our results.

Note that when using period(2m) and every(2m) together, the amount of points written was 30242, which is probably the correct number of points that should be writtten. However, as described above, we can’t use period(2m) because some of our clients are using timestamps that are five minutes in the past. So period(2m) will never contain these points.

| period | every | group by | amount of points written out to influxdb |
========================================================================
| 2m     | 2m    | 2m       | 30242                                    |
| 10m    | 2m    | 2m       | 22885                                    |
| 20m    | 2m    | 2m       | 16212                                    |
| 10m    | 10m   | 2m       | 16443                                    |
| 20m    | 20m   | 2m       | 8567                                     |

Shouldn’t all these settings have resulted in the same amount of points being written to influxdb?

These results are consistently reproducible on the data set we used.

We also enabled debug logging in kapacitor, and found out that the “collected” and “emitted” values emitted for the query->influxDBOut are reporting different amounts of emitted and collected points than the actual amount of resulting points in the forever retention policy. This also seems weird to me.

Can you share the output of kapacitor show for one of the run?

This will help determine where Kapacitor may be dropping data.

The lagged data is probably the reason the CQs didn’t work since they run in realtime. The lagged values would also break using stream as it also runs in real time.

Using the longer period helps to fix incorrect values in the past but it duplicates work.

Maybe using the .offset parameter would be simpler and helps us get to the bottom of it. See https://docs.influxdata.com/kapacitor/v1/reference/nodes/query_node/ Using a offset of say 10m will mean that Kapacitor only queries data that is 10m old ensuring all the raw data has arrived. Then every and period can be the same.
So lets try this TICKscript so simplify the problem:

batch
|query('SELECT mean(value) as value from "mydb"."four_weeks"."log_record"')
    .offset(10m)
    .period(2m)
    .every(2m)
    // no need to group by time since we will only get one point  for the period
    .groupBy(*)
    .align()
|influxDBOut()
    .database('mydb')
    .retentionPolicy('forever')
    .measurement('log_record')
    .precision('s')

As for the table of results

Shouldn’t all these settings have resulted in the same amount of points being written to influxdb?

Almost, all rows that have the a period and every value that are equal should have the same number of points.
But this depends on a consistent amount of data arriving during each test which doesn’t seem to be controlled.

This was tested on data recorded using kapacitor, and replayed on the task. So the amount of source data should be the same on every run.

No, because the test runs are done using kapacitor replay and kapacitor record which doesn’t store these data for kapacitor show.

That sounds reasonable.

As I was replaying the task on recorded batch data, this shouldn’t make any difference, right? In real-time, the 5 minute difference in time should make a difference, but in the recording we always have data for the last 2 minutes… Or does kapacitor take into account when the data was written in, and not only which timestamp was given to that data?

@nathaniel any idea? We’re currently moving to a system where we don’t rely on our client’s computers date and time and instead use the time at which the values come in to our server. This should fix the out of sync clock problem, and we hope it also fixes our kapacitor troubles.

@jaapz

You can get the show output for a recording/replay use the -replay flag to the show command. something like:

kapacitor show -replay REPLAY_ID TASK_ID

You are right since using the recording/replay features offset will not help much since the queries are performed when the recording is created.

Or does kapacitor take into account when the data was written in, and not only which timestamp was given to that data?

Kapacitor does not take into account when the data was written, only the timestamp of the data is used. But if the data doesn’t exist in the database when the recording is created then the data won’t be saved in the recording.