Kapacitor: compare every point to rolling windows

Hi, I’m very new to Kapacitor and am having trouble grokking the documentation, so I’d like to know from you all if this is even possible, and if so, how?

What I’d like to do is evaluate every new point that arrives to influx and compare it to the preceding window. It’s a custom sigma function that looks at the recent window only, not the entire data set.

For example, with a temperature measurement every second, I’d like to compute the 5-second prior window of mean and stddev and compare each individual point to its mean and stddev. So, given a measurement like the first two columns (time and temp), I’d like to compute the sigma in the last column, defined as abs(t-mean_t)/stddev. To be clear, each new point should have its own 5-second mean and stddev.

A potential complication is that the data may not arrive every second. I’ve indicated these with blanks in the data below, but of course they wouldn’t actually be in the database. So a 5-second window may have fewer (or more) than 5 points.

time_s  temperature	five_second_mean	five_second_stdev	sigma
1	25			
2	24			
3	23			
4	24			
5	23	23.8	0.84	0.96
6	25	23.8	0.84	1.43
7	26	24.2	1.30	1.38
8				
9	35	27.3	5.32	1.46
10	24	27.5	5.07	0.69
11	25	27.5	5.07	0.49
12	26	27.5	5.07	0.30
13	27	27.4	4.39	0.09
14				
15	28	26.5	1.29	1.16
16	28	27.3	0.96	0.78
17	28	27.8	0.50	0.50
18	29	28.3	0.50	1.50
19	30	28.6	0.89	1.57
20	28	28.6	0.89	0.67
21	27	28.4	1.14	1.23
22	25	27.8	1.92	1.46
23	26	27.2	1.92	0.62

What I’m stuck on is getting a calculation that involves individual points and aggregate data.

I’ve tried some windowing/downsampling (though the code below doesn’t work) but these windows are calculated every 5s instead of every time a new point arrives. The rest is a mystery.

Thanks in advance for your help.

dbrp "project_4"."thirty_days"

var period = 5s
var interval = 5s

var data = stream
    |from()
        .database('project_4')
        .retentionPolicy('thirty_days')
        .measurement('wx_stations')
        .groupBy('station')
    |window()
        .period(period)
        .every(interval)
        
var mean_t = data
    |eval(lambda: mean('temperature'))
        .as('mean_t')
        .keep()
    
var stddev_t = data
    |eval(lambda: stddev('temperature'))
        .as('stddev_t')
        .keep()
        
mean_t
    |join(stddev_t)
        .as('mean_t', 'stddev_t')
    |influxDBOut()
        .database('debugging')
        .retentionPolicy('autogen')
        .measurement('debug')
        .precision('s')

Surely this isn’t such a strange use case?

On another note, has anyone found any supplemental documentation that can help with learning Kapacitor? I’m talking about basic conceptual stuff and how to navigate the language. I don’t mean to sound ungrateful for this free, open source software but the official documentation is pretty thin.

Did you ever find a solution? I feel like this would be a very common use case

Hello,

Did you find a solution? I have somehow similar thing to do. I have a stream with device’s battery voltage measurements that are updated once a day and I want to compute a voltage drop when new value arrives. So I need to take previous time window somehow and use it with the new time window. Any clue on how to achieve that?