I would like help figuring out why my streaming script is missing points.
I have a tick script that performs a custom sigma computation (the existing sigma function uses the entire dataset to compute mean and st dev and I don’t want that) and I would like the computation to be performed on each point that hits the database. For each point I’d like to compute mean and stdev in the prior 60s and compare each point to those values.
I set up a dummy data emitter that feeds data to the influxdb every 3 seconds:
name: observations_test_dm
time station var_767
---- ------- -------
2018-06-15T17:58:43-07:00 wx1 301.1
2018-06-15T17:58:40-07:00 wx1 300.9
2018-06-15T17:58:37-07:00 wx1 300.6
2018-06-15T17:58:34-07:00 wx1 300.4
2018-06-15T17:58:31-07:00 wx1 300.2
2018-06-15T17:58:28-07:00 wx1 300
2018-06-15T17:58:24-07:00 wx1 299.8
2018-06-15T17:58:21-07:00 wx1 299.6
2018-06-15T17:58:18-07:00 wx1 299.5
2018-06-15T17:58:15-07:00 wx1 299.3
2018-06-15T17:58:12-07:00 wx1 299.1
2018-06-15T17:58:09-07:00 wx1 298.8
2018-06-15T17:58:06-07:00 wx1 298.6
2018-06-15T17:58:03-07:00 wx1 298.3
2018-06-15T17:58:00-07:00 wx1 298.1
2018-06-15T17:57:57-07:00 wx1 297.9
My tick script below does the computations and stores the data in another measurement:
ID: sigma_var_767_station_wx1
Error:
Template:
Type: stream
Status: enabled
Executing: true
Created: 15 Jun 18 15:04 PDT
Modified: 15 Jun 18 17:30 PDT
LastEnabled: 15 Jun 18 17:30 PDT
Databases Retention Policies: ["project_4"."thirty_days"]
TICKscript:
dbrp "project_4"."thirty_days"
var crit_message = 'Critical sigma level'
var warn_message = 'Warning sigma level'
var ok_message = 'OK'
var data = stream
|from()
.database('project_4')
.retentionPolicy('thirty_days')
.measurement('observations_test_dm')
.groupBy(*)
.where(lambda: "station" == 'wx1')
|window()
.period(60s)
// .align()
var mean_field = data
|mean('var_767')
.as('mean_field')
var last_field = data
|last('var_767')
.as('last_field')
var stddev_field = data
|stddev('var_767')
.as('stddev_field')
var sigma_field = mean_field
|join(last_field, stddev_field)
.as('A', 'B', 'C')
|eval(lambda: abs("B.last_field" - "A.mean_field") / "C.stddev_field", lambda: 60s, lambda: "B.last_field")
.as('sigma_var_767', 'sigma_window_var_767', 'value_var_767')
sigma_field
|influxDBOut()
.create()
.database('debugging')
.retentionPolicy('autogen')
.measurement('sigma_debug')
.precision('s')
.tag('variable', 'var_767')
DOT:
digraph sigma_var_767_station_wx1 {
graph [throughput="0.00 points/s"];
stream0 [avg_exec_time_ns="0s" errors="0" working_cardinality="0" ];
stream0 -> from1 [processed="5074"];
from1 [avg_exec_time_ns="6.162µs" errors="0" working_cardinality="0" ];
from1 -> window2 [processed="362"];
window2 [avg_exec_time_ns="11.028µs" errors="0" working_cardinality="1" ];
window2 -> stddev5 [processed="362"];
window2 -> last4 [processed="362"];
window2 -> mean3 [processed="362"];
stddev5 [avg_exec_time_ns="2.772µs" errors="0" working_cardinality="1" ];
stddev5 -> join8 [processed="362"];
last4 [avg_exec_time_ns="2.926µs" errors="0" working_cardinality="1" ];
last4 -> join8 [processed="362"];
mean3 [avg_exec_time_ns="1.886µs" errors="0" working_cardinality="1" ];
mean3 -> join8 [processed="362"];
join8 [avg_exec_time_ns="5.976µs" errors="0" working_cardinality="1" ];
join8 -> eval9 [processed="362"];
eval9 [avg_exec_time_ns="20.206µs" errors="0" working_cardinality="1" ];
eval9 -> influxdb_out10 [processed="362"];
influxdb_out10 [avg_exec_time_ns="7.141µs" errors="1" points_written="358" working_cardinality="0" write_errors="1" ];
}
The problem is the approx 25-30s gap between 2018-06-15T17:58:06 and 2018-06-15T17:58:31. This happens every minute, where the data seems to stop at a couple seconds past the minute, then resume at about 30 seconds later.
name: sigma_debug
time station sigma_var_767 sigma_window_var_767 value_var_767
---- ------- ------------- -------------------- -------------
2018-06-15T17:58:43-07:00 wx1 1.478272110646711 1m0s 301.1
2018-06-15T17:58:40-07:00 wx1 1.631503671561765 1m0s 300.9
2018-06-15T17:58:37-07:00 wx1 1.769518906665904 1m0s 300.6
2018-06-15T17:58:34-07:00 wx1 2.0464387313378403 1m0s 300.4
2018-06-15T17:58:31-07:00 wx1 2.506105720008151 1m0s 300.2
2018-06-15T17:58:06-07:00 wx1 1.63789511933491 1m0s 298.6
2018-06-15T17:58:03-07:00 wx1 1.0241357962138484 1m0s 298.3
2018-06-15T17:58:00-07:00 wx1 0.94931009421774 1m0s 298.1
2018-06-15T17:57:57-07:00 wx1 0.952895491743401 1m0s 297.9
I suspect this has something to do with kapacitor’s buffer filling up or something? It’s suspicious because the same thing happens when data arrive at the database every 1 second, or 2 seconds, or 8 seconds.
It’s critical that I am able to evaluate every point that enters the database. Where am I going wrong?