Downsampling telegraf measurements with multiple fields

Hi,

I’m trying to use Kapacitor for downsampling telegraf data, like with a CQ, but I’m running into some problems.

The basic setup works, the trick now is how to deal with measurements containing multiple values, say, “net_tx” and “net_rx”, where I’d like to downsample to the ma
x() of each, sort of like this but in tick shape:

CREATE CONTINUOUS QUERY cq ON telegraf BEGIN  
    SELECT max(net_tx) AS net_tx, max(net_rx) AS net_rx
    INTO telegraf."400d".net FROM telegraf.autogen.net
    GROUP BY time(5m)
END

What is the most sensible way to do this?

I found a few examples using join() such as this one but my attempts at it seem to consistently die on this error message:

invalid TICKscript: line 19 char 6: error calling func "join" on obj
*pipeline.InfluxQLNode: reflect: cannot use string as type pipeline.Node in
Call

Test script:

dbrp "telegraf"."autogen"

var data = stream
    |from()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('foobar')
        .where(lambda: (<something>)
        .groupBy(*)
    |window()
        .period(5m)
        .every(5m)
        .align()

var v1 = data |max('v1').as('v1')
var v2 = data |max('v2').as('v2')
var v3 = data |max('v3').as('v3')

v1
    |join('v2','v3')
        .as('v1','v2','v3')
    |influxDBOut()
        .cluster('writer')
        .database('telegraf')
        .retentionPolicy('400d')
        .measurement('foobar')
        .precision('s')

Anybody got any clues for me?

Hello @Tubemeister,
I don’t have an answer (I’m not a Kapacitor expert), but I’ll share your question with the right people. However, if you’re open to upgrading to 2.0-r.c. you could easily execute this in task. The following resource might be valuable to you. Downsampling with InfluxDB v2.0 | InfluxData Your task would look something like:

option task = {name: "Downsampling CPU", every: 1m}
 
data = from(bucket: "my-bucket")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "my_measurement")
 
data
|> filter(fn: (r) => r._value == "v1")
|> max()
|> set(key: "max_type",value: “max_v1")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["max_type"])
 
data
|> filter(fn: (r) => r._value == "v2")
|> max()
|> set(key: "max_type",value: “max_v2")
|> to(bucket: "downsampled", org: "my-org", tagColumns: ["max_type"])

Definitely interested in 2.0 but it’s not a short-term upgrade yet. :slight_smile:

If the fields are all in the same measurement, you could try a batch task

Query the fields you want with the aggregations you require then write them back to the same database with a new retention policy. End result is one retention policy with all raw data and one retention policy with the aggregated data.

You can write it to a different database/measurement name if you want to. I’d have an attempt at providing the query but i don’t know your data. As a tip, if you run Chronograf you can build the query there and paste it into the TICK script.

//Downsample data from the net measurement.
batch
    |query('your query' FROM "yourdatabase"."autogen"."network"')
        .period(5m)
        .every(5m)
        .groupBy(time(5m), *)
        
    //|log()
    |influxDBOut()
        .database('database')
        .retentionPolicy('your_new_rp')
        .measurement('network')
        .precision('s')

As for doing this in Influx 2.0 - I’ve no idea, not tried it :slight_smile:
Hope that helps

1 Like

I hadn’t tried batch yet as I wasn’t sure how it would scale to hundreds of tasks doing queries every 5m. :slight_smile:

For now I have ‘solved’ this by cheating, doing max(“net_tx”) which selects a row which also has some value for “net_rx”. This is ugly but it’s close enough for my current purpose and allows me to move on.

1 Like