Kapacitor stream process skipping points

kapacitor
#1

I would like help figuring out why my streaming script is missing points.

I have a tick script that performs a custom sigma computation (the existing sigma function uses the entire dataset to compute mean and st dev and I don’t want that) and I would like the computation to be performed on each point that hits the database. For each point I’d like to compute mean and stdev in the prior 60s and compare each point to those values.

I set up a dummy data emitter that feeds data to the influxdb every 3 seconds:

name: observations_test_dm
time                      station var_767
----                      ------- -------
2018-06-15T17:58:43-07:00 wx1     301.1
2018-06-15T17:58:40-07:00 wx1     300.9
2018-06-15T17:58:37-07:00 wx1     300.6
2018-06-15T17:58:34-07:00 wx1     300.4
2018-06-15T17:58:31-07:00 wx1     300.2
2018-06-15T17:58:28-07:00 wx1     300
2018-06-15T17:58:24-07:00 wx1     299.8
2018-06-15T17:58:21-07:00 wx1     299.6
2018-06-15T17:58:18-07:00 wx1     299.5
2018-06-15T17:58:15-07:00 wx1     299.3
2018-06-15T17:58:12-07:00 wx1     299.1
2018-06-15T17:58:09-07:00 wx1     298.8
2018-06-15T17:58:06-07:00 wx1     298.6
2018-06-15T17:58:03-07:00 wx1     298.3
2018-06-15T17:58:00-07:00 wx1     298.1
2018-06-15T17:57:57-07:00 wx1     297.9

My tick script below does the computations and stores the data in another measurement:

ID: sigma_var_767_station_wx1
Error:
Template:
Type: stream
Status: enabled
Executing: true
Created: 15 Jun 18 15:04 PDT
Modified: 15 Jun 18 17:30 PDT
LastEnabled: 15 Jun 18 17:30 PDT
Databases Retention Policies: ["project_4"."thirty_days"]
TICKscript:
dbrp "project_4"."thirty_days"

var crit_message = 'Critical sigma level'

var warn_message = 'Warning sigma level'

var ok_message = 'OK'

var data = stream
    |from()
        .database('project_4')
        .retentionPolicy('thirty_days')
        .measurement('observations_test_dm')
        .groupBy(*)
        .where(lambda: "station" == 'wx1')
    |window()
        .period(60s)
    // .align()
    
var mean_field = data
    |mean('var_767')
        .as('mean_field')

var last_field = data
    |last('var_767')
        .as('last_field')

var stddev_field = data
    |stddev('var_767')
        .as('stddev_field')

var sigma_field = mean_field
    |join(last_field, stddev_field)
        .as('A', 'B', 'C')
    |eval(lambda: abs("B.last_field" - "A.mean_field") / "C.stddev_field", lambda: 60s, lambda: "B.last_field")
        .as('sigma_var_767', 'sigma_window_var_767', 'value_var_767')

sigma_field
    |influxDBOut()
        .create()
        .database('debugging')
        .retentionPolicy('autogen')
        .measurement('sigma_debug')
        .precision('s')
        .tag('variable', 'var_767')

DOT:
digraph sigma_var_767_station_wx1 {
graph [throughput="0.00 points/s"];

stream0 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
stream0 -> from1 [processed="5074"];

from1 [avg_exec_time_ns="6.162µs" errors="0" working_cardinality="0" ];
from1 -> window2 [processed="362"];

window2 [avg_exec_time_ns="11.028µs" errors="0" working_cardinality="1" ];
window2 -> stddev5 [processed="362"];
window2 -> last4 [processed="362"];
window2 -> mean3 [processed="362"];

stddev5 [avg_exec_time_ns="2.772µs" errors="0" working_cardinality="1" ];
stddev5 -> join8 [processed="362"];

last4 [avg_exec_time_ns="2.926µs" errors="0" working_cardinality="1" ];
last4 -> join8 [processed="362"];

mean3 [avg_exec_time_ns="1.886µs" errors="0" working_cardinality="1" ];
mean3 -> join8 [processed="362"];

join8 [avg_exec_time_ns="5.976µs" errors="0" working_cardinality="1" ];
join8 -> eval9 [processed="362"];

eval9 [avg_exec_time_ns="20.206µs" errors="0" working_cardinality="1" ];
eval9 -> influxdb_out10 [processed="362"];

influxdb_out10 [avg_exec_time_ns="7.141µs" errors="1" points_written="358" working_cardinality="0" write_errors="1" ];
}

The problem is the approx 25-30s gap between 2018-06-15T17:58:06 and 2018-06-15T17:58:31. This happens every minute, where the data seems to stop at a couple seconds past the minute, then resume at about 30 seconds later.

name: sigma_debug
time                      station sigma_var_767       sigma_window_var_767 value_var_767
----                      ------- -------------       -------------------- -------------
2018-06-15T17:58:43-07:00 wx1     1.478272110646711   1m0s                 301.1
2018-06-15T17:58:40-07:00 wx1     1.631503671561765   1m0s                 300.9
2018-06-15T17:58:37-07:00 wx1     1.769518906665904   1m0s                 300.6
2018-06-15T17:58:34-07:00 wx1     2.0464387313378403  1m0s                 300.4
2018-06-15T17:58:31-07:00 wx1     2.506105720008151   1m0s                 300.2
2018-06-15T17:58:06-07:00 wx1     1.63789511933491    1m0s                 298.6
2018-06-15T17:58:03-07:00 wx1     1.0241357962138484  1m0s                 298.3
2018-06-15T17:58:00-07:00 wx1     0.94931009421774    1m0s                 298.1
2018-06-15T17:57:57-07:00 wx1     0.952895491743401   1m0s                 297.9

I suspect this has something to do with kapacitor’s buffer filling up or something? It’s suspicious because the same thing happens when data arrive at the database every 1 second, or 2 seconds, or 8 seconds.

It’s critical that I am able to evaluate every point that enters the database. Where am I going wrong?