Creating a Downsampling Task with conditions

Hello everyone,

I’m new to using InfluxDB and I’m currently facing a challenge with downsampling my data stored in InfluxDB. Here’s the Influx-task I’ve been working on:

option task = {name: "qm-downsampled-5m-mp", every: 1h}

from(bucket: "qm")
    |> range(start: -task.every)
    |> filter(fn: (r) => r["_measurement"] == "mp")
    |> filter(fn: (r) => r["_field"] == "temperature")
    |> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
    |> yield(name: "mean")
    |> to(bucket: "qm_sampled_mp")

This script successfully calculates the average temperature in batches of 5 minutes and executes every 1h.

Now, I want to modify this task so it first checks if there are more than 20 data points in each 5-minute interval (inside of the aggregateWindow of 5 minutes). If there are, it should calculate the mean of these data points. If not, it should ignore that interval.

Here’s an idea of what I’m looking to do, but I’m not sure how to implement it correctly:

option task = {name: "qm-downsampled-5m-mp", every: 1h}

from(bucket: "qm")
    |> range(start: -task.every)
    |> filter(fn: (r) => r["_measurement"] == "mp")
    |> filter(fn: (r) => r["_field"] == "temperature")
    |> aggregateWindow(every: 5m, fn: {if count() > 20 then mean else drop}, createEmpty: false)
    |> yield(name: "mean")
    |> to(bucket: "qm_sampled_mp")

Can anyone guide me on how to adjust my script to achieve this?

Thank you very much for your help.

Interested to see how it can be done with a custom function.

However you can do that with something like:

raw_data = from(bucket: "bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "filtera")
  |> filter(fn: (r) => r["_field"] == "value")
  |> filter(fn: (r) => r["lien_mesure"] == "filterb")

count = raw_data |> aggregateWindow(every: v.windowPeriod, fn: count, createEmpty: false)
|> keep(columns:["_value","_time","_field"])
|> set(key: "_field", value: "count")
//|> yield(name:"count")

mean = raw_data |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> keep(columns:["_value","_time","_field"])
|> set(key: "_field", value: "mean")
//> yield(name:"mean")

union(tables: [count, mean])
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> filter(fn: (r) => r.count >= 55)
|> drop(columns: ["count"])
|> rename(columns: {mean: "_value"})
|> yield(name:"join")

Thansk a lot for your replay at help, it almost works :sunglasses:

So what I currently have based on your code is following:

raw_data =
    from(bucket: "qm")
        |> range(start: -24h, stop: -23h)
        |> filter(fn: (r) => r["_measurement"] == "mp")
        |> filter(fn: (r) => r["_field"] == "temp" or r["_field"] == "hum")

count =
    raw_data
        |> aggregateWindow(every: 5m, fn: count, createEmpty: false)
        |> set(key: "_type", value: "count")

mean =
    raw_data
        |> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
        |> set(key: "_type", value: "mean")

What I don’t understand is how to do now the union and the pivot. My row should also include the _field name (which could be for me temp or hum).
In the end my rows should look something like this:

[_field; _time, _measurement, count, mean]

Again thanks alot for your help.

Ok so if you have two fields… you can do it like this (it’s maybe better than my first proposal)

raw_data = from(bucket: "sdfsdfs")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "measurement")
  |> filter(fn: (r) => r["_field"] == "fieldvalue1" or r["_field"] == "fieldvalue2")


count = raw_data |> aggregateWindow(every: v.windowPeriod, fn: count, createEmpty: false)
|> keep(columns:["_value","_time","_field"])
|> rename(columns: {_value: "count"})
//|> yield(name:"count")

mean = raw_data |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> keep(columns:["_value","_time","_field"])
|> rename(columns: {_value: "mean"})
//|> yield(name:"mean")

join(tables: {t1: count, t2: mean}, on: ["_time","_field"])
|> filter(fn: (r) => r.count >= 1)
|> drop(columns: ["count"])
|> rename(columns: {mean: "_value"})

Notice that this way you end with two table, one for temp one for humidity, so if you already know that there is always the same number of data in each it’s not optimal.
In the same way of optimisation, maybe we can use windows in the rawdata query then apply count and mean instead of using two aggregatewindows.

I am using sometime “union” sometime “join”, join is autopivoting and adding some _X at the end of collumn names if they are the same so sometime it’s more convenient, sometime not. It’s why i use rename on value, to avoid “type_1” “value_1” kind of result.

(My test don’t keep the measurement name and i olso drop the count, but you can change that playing with the keep / drop function easily)

Thank you very much @thomasbromkapsdata, i appreciate your effort to help me :pray:
It’s exactly what I was looking for and it works perfectly fine for what i’m doing. I was and I still am a little bit confused about the difference between union and join, but I’ll read more about it from the doc. At least, it works now, thanks again.

Andy