InfluxDB CQ vs Kapacitor Batch vs Kapacitor Stream

Hello,

Moving the questions from here.

I’m trying to understand correctly the behaviour of the InfluxDB CQ vs Kapacitor Batch vs Kapacitor Stream.

For that purpose, I built the equivalent configurations for the 3 cases (or that’s my intention), where I want to calculate the mean of a field called responseTime that contains the response time of a service request grouped by 1 minute:

CREATE CONTINUOUS QUERY meanResponseTime_1m_cq ON events
RESAMPLE EVERY 1m
BEGIN
  SELECT mean(responseTime) INTO meanResponseTime_1m_cq FROM test123_11 GROUP BY time(1m)
END

batch
    |query('
         SELECT responseTime
         FROM "events"."retention_policy".test123_11
    ')
        .every(1m)
        .period(1m)
        .align()
    |mean('responseTime')
    |influxDBOut()
        .database('events')
        .measurement('meanResponseTime_1m_batch')
        .flushInterval(1s)

batch
    |query('
         SELECT mean(responseTime)
         FROM "events"."retention_policy".test123_11
    ')
        .every(1m)
        .period(1m)
        .align()
    |influxDBOut()
        .database('events')
        .measurement('meanResponseTime_1m_batch_mean_in_query')
        .flushInterval(1s)

stream
    |from()
        .database('events')
        .retentionPolicy('retention_policy')
        .measurement('test123_11')
    |window()
        .period(1m)
        .every(1m)
        .align()
    |mean('responseTime')
    |influxDBOut()
        .database('events')
        .measurement('meanResponseTime_1m_stream')
        .flushInterval(1s)

The first question is, are these configuration equivalents?

Visualizing the data with Grafana (with the following queries), it can be seen that the resulting data is quite different:

SELECT mean("responseTime") FROM "test123_11"                              WHERE $timeFilter GROUP BY time(1m)
SELECT mean                 FROM "meanResponseTime_1m_cq"                  WHERE $timeFilter
SELECT mean                 FROM "meanResponseTime_1m_stream"              WHERE $timeFilter
SELECT mean                 FROM "meanResponseTime_1m_batch"               WHERE $timeFilter
SELECT mean                 FROM "meanResponseTime_1m_batch_mean_in_query" WHERE $timeFilter

  1. Why this different behaviour?

  2. The CQ result is exactly the same as querying directly to the original data, but with a delay as late as the period, in this case 1 minute. So, if now is 13:02:XX, the last period that can be queried is between 13:01:00 and 13:02:00. Why this behaviour? Why the CQ doesn’t query till 2 periods after? (the same behaviour as the batch calculating the mean in the query).

  3. Why, in a Kapacitor Batch task, the behaviour is different between calculating the mean in the batch query than in the task as a query chaining method? (And it looks like calculating the mean in the query node is more accurate/correct)

  4. Is the behaviour of the stream compared with the CQ or with the batch?

Additionally, similary to the last example, I’m trying to understand the behaviour of the windows. I would like to calculate the mean of the response time each 2 seconds, for the last 5 minutes (in the time point 00:00 (mm:ss), the mean of the response time of the events between -05:00 and the 00:00, in the point 00:02, the mean of the response time of the events between -04:58 and the 00:02, in the point 00:04, the mean of the response time of the events between -04:56 and the 00:04…)

CREATE CONTINUOUS QUERY meanResponseTime_window2s5m_cq ON events
RESAMPLE EVERY 2s
BEGIN
  SELECT mean(responseTime) INTO meanResponseTime_window2s5m_cq FROM test123_11 GROUP BY time(5m)
END

stream
    |from()
        .database('events')
        .retentionPolicy('retention_policy')
        .measurement('test123_11')
    |window()
        .period(5m)
        .every(2s)
        .align()
    |mean('responseTime')
    |influxDBOut()
        .database('events')
        .measurement('meanResponseTime_window2s5m_stream')
        .flushInterval(1s)

batch
    |query('
         SELECT responseTime
         FROM "events"."retention_policy".test123_11
    ')
        .every(2s)
        .period(5m)
        .align()
flujo
    |mean('responseTime')
    |influxDBOut()
        .database('events')
        .measurement('meanResponseTime_window2s5m_batch')
        .flushInterval(1s)

batch
    |query('
         SELECT mean(responseTime)
         FROM "events"."retention_policy".test123_11
    ')
        .every(2s)
        .period(5m)
        .align()
    |influxDBOut()
        .database('events')
        .measurement('meanResponseTime_window2s5m_batch_mean_in_query')
        .flushInterval(1s)

Visualizing the data with Grafana (with the following queries), it can be seen that the resulting data is quite different:

SELECT mean FROM "meanResponseTime_window2s5m_cq"                  WHERE $timeFilter
SELECT mean FROM "meanResponseTime_window2s5m_stream"              WHERE $timeFilter
SELECT mean FROM "meanResponseTime_window2s5m_batch"               WHERE $timeFilter
SELECT mean FROM "meanResponseTime_window2s5m_batch_mean_in_query" WHERE $timeFilter

GRAPHIC WINDOW EVERY 2s PERIOD 5m

The CQ, according to the documentation, executes at boundary of last complete interval, not at each 2s for the last 5m as I want to get. So I cannot use this way to obtein the desired behaviour.

The Kapacitor Batch with the mean calculation in the query looks like wait till the period of time is complete, I mean, the last point that it writes is now()-5minutes, why?

And about Kapacitor Stream and the others… why are they so different? Something wrong in the configuration? If the behaviour is correct, how they realy work?

Sorry if these questions are so obvious, but after a time trying to understand them I haven’t found the answer.

Thanks in advance.

@Juann First off thanks for this write up. I’m currently looking into this issue, would be great to get a bit more information. What version of Kapacitor are you using? Can you share your data source?

I’ve started trying to reproduce the issue using the _internal database.

(1) Why this different behaviour?

Still investigating. I’ve reproduced something, but it’s slightly different from what you have. In my repro, both the batch and stream tasks that calculate the mean in kapacitor have the same result. And the CQ and batch task that calculate the mean in the query have the same result.

(2) The CQ result is exactly the same as querying directly to the original data, but with a delay as late as the period, in this case 1 minute. So, if now is 13:02:XX, the last period that can be queried is between 13:01:00 and 13:02:00. Why this behaviour? Why the CQ doesn’t query till 2 periods after? (the same behaviour as the batch calculating the mean in the query).

I’m not entirely sure I understand what the issue is. To me it looks like the data is able to be queried 1 period after. The reason for this is that the CQ will run at the frequency of the EVERY clause or the frequency of the GROUP BY time. So if the CQ runs at a 1m period, then the data for the previous 1m period should be able to be queried.

(3) Why, in a Kapacitor Batch task, the behaviour is different between calculating the mean in the batch query than in the task as a query chaining method? (And it looks like calculating the mean in the query node is more accurate/correct)

Still working on this.

(4) Is the behaviour of the stream compared with the CQ or with the batch?

Not sure what you’re asking here.

As soon as I have an answer to the first section, I’ll start looking into the second section.

Thanks again for this write up!

Following up on my previous post. It appears that everything is working correctly for me. The issue that I was seeing was a design decision in Kapacitor.

The following CQ and Kapacitor task have the same behavior
CQ

create continuous query meanResponseTime_1m_cq on _internal begin
  select mean(queryDurationNs) into meanQueryDuration_1m_cq from queryExecutor
  group by time(1m)
end

Task

//mean_in.tick
batch
  |query('SELECT mean(queryDurationNs) FROM "_internal".monitor.queryExecutor')
    .period(1m)
    .every(1m)
    .align()
  |influxDBOut()
    .database('_internal')
    .measurement('selectMeanQueryDuration_1m_kap')
$ kapacitor define mean_in -tick mean_in.tick -dbrp _internal.monitor -type batch
$ kapacitor enable mean_in

The reason for this is that in the query

SELECT mean(queryDurationNs) FROM "_internal".monitor.queryExecutor time >= '2017-03-13T17:50:00Z' AND time < '2017-03-13T17:51:00Z'

InfluxDB will assign the resulting timestamps for each of the time buckets. Meaning that in the example listed above, the resulting mean value will be assigned to '2017-03-13T17:50:00Z'.

In comparison
Batch Task

//mean_out.tick
batch
  |query('SELECT queryDurationNs FROM "_internal".monitor.queryExecutor')
    .period(1m)
    .every(1m)
    .align()
  |mean('queryDurationNs')
  |influxDBOut()
    .database('_internal')
    .measurement('kapMeanQueryDuration_1m_kap')
$ kapacitor define mean_out -tick mean_out.tick -dbrp _internal.monitor -type batch
$ kapacitor enable mean_out

Stream Task

stream
  |from()
    .database('_internal')
    .retentionPolicy('monitor')
    .measurement('queryExecutor')
  |window()
    .period(1m)
    .every(1m)
    .align()
  |mean('queryDurationNs')
  |influxDBOut()
    .database('_internal')
    .measurement('streamQueryDuration_1m_kap')
$ kapacitor define stream_1m -tick stream.tick -dbrp _internal.monitor -type stream
$ kapacitor enable stream_1m

the mean calculation of these two tasks happens inside of Kapacitor, so the data for the query

SELECT mean(queryDurationNs) FROM "_internal".monitor.queryExecutor time >= '2017-03-13T17:50:00Z' AND time < '2017-03-13T17:51:00Z'

would pick a timestamp of '2017-03-13T17:51:00Z'

This discrepancy between InfluxDB and Kapacitor is the result of Kapacitors ability to do complex joining operations. Meaning if you were to join the mean over the last month with the the mean over the least day, you would need their resulting values to occur at the same time. Using the most recent time, the time the mean was computed, to represent both points makes sense.

Additionally, similary to the last example, I’m trying to understand the behaviour of the windows. I would like to calculate the mean of the response time each 2 seconds, for the last 5 minutes (in the time point 00:00 (mm:ss), the mean of the response time of the events between -05:00 and the 00:00, in the point 00:02, the mean of the response time of the events between -04:58 and the 00:02, in the point 00:04, the mean of the response time of the events between -04:56 and the 00:04…)

This should be possible by doing something similar to the following, which I’ve tested.

Batch Task

//batcher.go
batch
  |query('SELECT queryDurationNs FROM "_internal".monitor.queryExecutor')
    .period(1m)
    .every(10s)
    .align()
  |log()
  |mean('queryDurationNs')
  |influxDBOut()
    .database('_internal')
    .measurement('kapMeanQueryDuration_1m_kap')
$ kapacitor define batcher -tick batcher.tick -type batch -dbrp _internal.monitor
$ kapacitor enable batcher

Stream Task

//streamer.go
stream
  |from()
    .database('_internal')
    .retentionPolicy('monitor')
    .measurement('queryExecutor')
  |window()
    .period(1m)
    .every(10s)
    .align()
  |log()
  |mean('queryDurationNs')
  |influxDBOut()
    .database('_internal')
    .measurement('streamQueryDuration_1m_kap')
$ kapacitor define streamer -tick streamer.tick -type stream -dbrp _internal.monitor
$ kapacitor enable streamer

The Kapacitor Batch with the mean calculation in the query looks like wait till the period of time is complete, I mean, the last point that it writes is now()-5minutes, why?

What do you mean?

And about Kapacitor Stream and the others… why are they so different? Something wrong in the configuration? If the behaviour is correct, how they realy work?

From what I’ve done, it seems like they both work correctly. If I had your data source I could attempt to reproduce it.

@Juann Did I answer all of the questions you had?

Thanks a lot for your answers.

Sorry, but I cannot share the data source. However I can say that this data is inserted into influxDB with a logstash. And logstash generates the inserted timestamp. Additionally, the Kapacitor version is 1.2, and the InfluxDB is the 1.2.

Trying to summarize my questions based on your answers:

  1. The behaviour of Kapacitor Batch and Kapacitor Stream should be the same.
  2. In Kapacitor Batch and Kapacitor Stream, when the aggregations are performed as task nodes, the picked timestamp is the end of the interval.
  3. InfluxDB queries calculations for a period of time (included CQ and Kapacitor Batch in query) take as the timestamp of the calculation the initial point of the interval (period or groupBy)
  4. The CQ is not calculated till the interval has passed

With a load of 20 events/second, in a 2 cores and 8GiB instance (AWS m4.large), writing 2 fields and 1 tag, the results are as expected:

  • InfluxDB direct query (green line) equal to CQ (yellow line), except the last point of the time serie for the CQ due to the interval is not finished.

  • Kapacitor Stream (blue line) and Kapacitor Batch (orange line) minor differences, but moved to one minute after the InfluxDB and CQ timeseries. This is because the timestamp of the calculation is the last point of the interval (period).

For a period=1m and every=1m

Also, with a window with a 5 minutes period and calculated every 2 seconds (just for Kapacitor Stream and Kapacitor Batch):

KAPACITOR WINDOW 5 minutes PERIOD EVERY 2 seconds

Regarding to this, is there any way to reproduce the behaviour of a Kapacitor Batch or Stream, of a window with 5 minutes period and calculated every 2 seconds with a influxDB CQ? (same as the last graph but with CQ)

Thanks a lot for your answers.

You’re welcome :slight_smile: . I’m happy to help where I can.

Regarding to this, is there any way to reproduce the behaviour of a Kapacitor Batch or Stream, of a window with 5 minutes period and calculated every 2 seconds with a influxDB CQ? (same as the last graph but with CQ)

Unfortunately there isn’t a way to achieve this kind of functionality using InfluxQL alone. You could get something similar if you had a script that ran every two seconds and executed

SELECT mean(<field>) INTO new_measurement FROM old_measurement WHERE time >= [lower time] and time < [upper time]

The fundamental issue is that CQs require a GROUP BY time clause. If this kind of functionality is important to you I’d recommend opening a feature request on InfluxDB.

If there’s anything else I can answer let me know.

1 Like

Due to this functionality can be covered perfectly by Kapacitor, for me it is not necessary to open a request.

Just if for someone is useful… Continuing with my tests, I have increased the insertion of event to 700 events/second in the same AWS instance, with the same configuration of Batch and Stream as before:

stream
    |from()
        .database('events')
        .retentionPolicy('retention_policy')
        .measurement('test')
    |window()
        .period(5m)
        .every(2s)
        .align()
    |mean('responseTime')
    |influxDBOut()
        .database('events')
        .measurement('meanResponseTime_window2s5m_stream')
        .flushInterval(1s)

batch
    |query('
         SELECT responseTime
         FROM "events"."retention_policy".test
    ')
        .every(2s)
        .period(5m)
        .align()
    |mean('responseTime')
    |influxDBOut()
        .database('events')
        .measurement('meanResponseTime_window2s5m_batch')
        .flushInterval(1s)

The timeseries are totally different:

BATCH vs STREAM DIFFERENT TIMESERIES with 700events/second load

This issue is due to the InfluxDB configuration, because as default, the chunk size is set to 10000

# The default chunk size for result sets that should be chunked.
# max-row-limit = 10000

To avoid this limitation, the max-row-limit must be change to 0.

As can be observed, the shape of both timelines are similar, but there are significant differences between them. As I understand, is how the data arrives to Kapacitor: at the same time as the data arrives to Influx in the Stream configuraiton (independently of the timestamp of the event), or as the data timestamp is stored in Influx when using Batch. Is this correct?

@Juann That is very weird. My guess is that some of the subscriptions to kapacitor are getting dropped. I’ve got a couple asks. Can I get

  1. A redacted version of your Kapacitor config.
  2. A redacted version of your Kapacitor logs.
  3. A redacted version of your InfluxDB logs.
  4. The output of SHOW SUBSCRIPTIONS in InfluxDB
  5. Can you run kapacitor show <task name> for the two tasks you have.