Aggregation of data arriving ”late” in kapacitor

We have a data stream with sends points every 1s per host and would like to aggregate this as mean rate over 10s interval for groups of hosts, differentiated by a tag (“bu”), and written to InfluxDB.

The challenge is that a handful of points may land in Kapacitor so late that they miss the initial 10s window where data is aggregated, creating a new window and ending up overwriting the previous point in InfluxDB.

I’ve tried various methods focused mainly around having another window to buffer the results from the first one but have failed to find a workable solution. Any help is greatly appreciated.

One constraint is that the amount of incoming data is significant (total amount of points is around 50k / sec) so the solution should be fairly resource-efficient.

Below is the best I’ve come up with thus far where the late points are dropped completely but is not really useful as we need to have all the points. Another solution would be to make the window larger but we would lose the 10s resolution which is also important.

var data = stream
  .where(lambda: isPresent("val"))
|where(lambda: unixNano(now()) - unixNano("time") < 10000000000)

have you ever got a solution/workaround for this?

Unfortunately not. The workaround of dropping old points has been working well enough in conjunction of reducing the tail latency of our metrics pipeline.

A more elegant workaround could be to do a relatively simple UDF that sets up a “fixed accumulating window” so all the data for a given fixed time period gets accumulated into a window which gets passed on only after it reaches a certain age.

Any data that arrives really late for that time period and misses the window is either dropped or written to a new window which has its timestamp is slightly jittered so that they will not overwrite the initial window. I think both approaches for managing old data can be useful depending a bit on how one wants to use it.

Thanks a lot @oplehto

Hello @Ashish_Sikarwar,
You might be interested in the execd processor plugin
telegraf/plugins/processors/execd at master · influxdata/telegraf · GitHub.

Thanks a lot @Anaisdg will try it.