Downsampling telegraf measurements with multiple fields


I’m trying to use Kapacitor for downsampling telegraf data, like with a CQ, but I’m running into some problems.

The basic setup works, the trick now is how to deal with measurements containing multiple values, say, “net_tx” and “net_rx”, where I’d like to downsample to the ma
x() of each, sort of like this but in tick shape:

    SELECT max(net_tx) AS net_tx, max(net_rx) AS net_rx
    INTO telegraf."400d".net FROM
    GROUP BY time(5m)

What is the most sensible way to do this?

I found a few examples using join() such as this one but my attempts at it seem to consistently die on this error message:

invalid TICKscript: line 19 char 6: error calling func "join" on obj
*pipeline.InfluxQLNode: reflect: cannot use string as type pipeline.Node in

Test script:

dbrp "telegraf"."autogen"

var data = stream
        .where(lambda: (<something>)

var v1 = data |max('v1').as('v1')
var v2 = data |max('v2').as('v2')
var v3 = data |max('v3').as('v3')


Anybody got any clues for me?

Hello @Tubemeister,
I don’t have an answer (I’m not a Kapacitor expert), but I’ll share your question with the right people. However, if you’re open to upgrading to 2.0-r.c. you could easily execute this in task. The following resource might be valuable to you. Your task would look something like:

option task = {name: "Downsampling CPU", every: 1m}
data = from(bucket: "my-bucket")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "my_measurement")
|> filter(fn: (r) => r._value == "v1")
|> max()
|> set(key: "max_type",value: “max_v1")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["max_type"])
|> filter(fn: (r) => r._value == "v2")
|> max()
|> set(key: "max_type",value: “max_v2")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["max_type"])

Definitely interested in 2.0 but it’s not a short-term upgrade yet. :slight_smile:

If the fields are all in the same measurement, you could try a batch task

Query the fields you want with the aggregations you require then write them back to the same database with a new retention policy. End result is one retention policy with all raw data and one retention policy with the aggregated data.

You can write it to a different database/measurement name if you want to. I’d have an attempt at providing the query but i don’t know your data. As a tip, if you run Chronograf you can build the query there and paste it into the TICK script.

//Downsample data from the net measurement.
    |query('your query' FROM "yourdatabase"."autogen"."network"')
        .groupBy(time(5m), *)

As for doing this in Influx 2.0 - I’ve no idea, not tried it :slight_smile:
Hope that helps

1 Like

I hadn’t tried batch yet as I wasn’t sure how it would scale to hundreds of tasks doing queries every 5m. :slight_smile:

For now I have ‘solved’ this by cheating, doing max(“net_tx”) which selects a row which also has some value for “net_rx”. This is ugly but it’s close enough for my current purpose and allows me to move on.

1 Like