We have a data stream with sends points every 1s per host and would like to aggregate this as mean rate over 10s interval for groups of hosts, differentiated by a tag (“bu”), and written to InfluxDB.
The challenge is that a handful of points may land in Kapacitor so late that they miss the initial 10s window where data is aggregated, creating a new window and ending up overwriting the previous point in InfluxDB.
I’ve tried various methods focused mainly around having another window to buffer the results from the first one but have failed to find a workable solution. Any help is greatly appreciated.
One constraint is that the amount of incoming data is significant (total amount of points is around 50k / sec) so the solution should be fairly resource-efficient.
Below is the best I’ve come up with thus far where the late points are dropped completely but is not really useful as we need to have all the points. Another solution would be to make the window larger but we would lose the 10s resolution which is also important.
var data = stream
|from()
.measurement('m')
.groupBy('bu','host')
.where(lambda: isPresent("val"))
.round(1s)
|derivative('val')
.unit(1s)
.nonNegative()
.as('val')
|window()
.period(10s)
.every(10s)
.align()
|where(lambda: unixNano(now()) - unixNano("time") < 10000000000)
|mean('val')
|GroupBy('bu')
|sum('mean')
|InfluxDBOut()
.measurement('faststats')
.database('tg_udp')
.retentionPolicy('tg_udp')