Aggregating min and max for many fields

Hi,

i’m Dario from Bochum, Germany and new to Influx 2.x (migrating from Influx 1).

I store data in a bucket and now, i want to aggregate that data using a task, which will called once every day.

To test it, i have raw data from one year in the test-bucket “sensors”
and i want to aggregate them in a bucket with infinite retention called “forever”.
This works as expected for

  • mean values (4h window)
  • min and max values (12h window)

with this queries:

from(bucket: "sensors")
  |> range(start: -365d)
  |> filter(fn: (r) => r["_measurement"] == "Weather")
  |> filter(fn: (r) => r["_field"] == "SolarRadiation")
  |> aggregateWindow(every: 4h, fn: mean)
  |> set(key: "_field", value: "SolarRadiation_mean")
  |> to(bucket: "forever")  

from(bucket: "sensors")
  |> range(start: -365d)
  |> filter(fn: (r) => r["_measurement"] == "Weather")
  |> filter(fn: (r) => r["_field"] == "SolarRadiation")
  |> aggregateWindow(every: 12h, fn: max)
  |> set(key: "_field", value: "SolarRadiation_max")
  |> to(bucket: "forever")  

from(bucket: "sensors")
  |> range(start: -365d)
  |> filter(fn: (r) => r["_measurement"] == "Weather")
  |> filter(fn: (r) => r["_field"] == "SolarRadiation")
  |> aggregateWindow(every: 12h, fn: min)
  |> set(key: "_field", value: "SolarRadiation_min")
  |> to(bucket: "forever")  

For tasks i would change “range(start: -365d)” to “range(start: -task.every)”

But when i want do this for much more fields, like:

  • Temperature
  • Humidity
  • Pressure
  • WindSpeed
  • WindDirection
  • GustSpeed
  • Rainrate
  • GoldcapVoltage
  • RainClicks
    an only max-values for
  • RainToday
  • Longest Blackout
  • Packets received
  • CRC-Errors
  • BattWarning

I will end up in 3*10 + 5 = 35 Querries = 35 Tasks.

I was wondering if there is an elegant method to archive this goal.

Hoping to hear an elegant solution

Greetings from Bochum.

Hello @dondario,
Do you really want to rename everything would it be okay not too? That’ll simplify the query.
You can do:

data = from(bucket: "sensors")
  |> range(start: -365d)
  |> filter(fn: (r) => r["_measurement"] == "Weather")
  |> filter(fn: (r) => r["_field"] == "SolarRadiation" or r["_field"] == "Temperature" or r["_field"] == "Pressure" or ...)
data
  |> aggregateWindow(every: 4h, fn: mean)
  |> to(bucket: "forever")  


data
  |> aggregateWindow(every: 4h, fn: min)
  |> to(bucket: "forever")  

data
  |> aggregateWindow(every: 4h, fn: max)
  |> to(bucket: "forever")  

Alternatively if you are querying every field in your measurement you can just do:

data = from(bucket: "sensors")
  |> range(start: -365d)
  |> filter(fn: (r) => r["_measurement"] == "Weather")

Lastly if you’re applying these aggregates to every field except a couple you can alternatively do:

from(bucket: "sensors")
  |> range(start: -365d)
  |> filter(fn: (r) => r["_measurement"] == "Weather")
  |> filter(fn: (r) => r["_field"] != "only field you dont want to calculate the mean for")
  |> aggregateWindow(every: 4h, fn: mean)
  |> set(key: "_field", value: "SolarRadiation_mean")
  |> to(bucket: "forever")  

But yes you’ll want to apply an aggregateWindow to all of your data for each aggreagation so the general structure will be the same.

However the following might interest you:

minMaxMean = (tables=<-) =>
  tables
    |> reduce(
      identity: {count: 0.0, sum: 0.0, min: 0.0, max: 0.0, mean: 0.0},
      fn: (r, accumulator) => ({
        count: accumulator.count + 1.0,
        sum: r._value + accumulator.sum,
        min: if accumulator.count == 0.0 then r._value else if r._value < accumulator.min then r._value else accumulator.min,
        max: if accumulator.count == 0.0 then r._value else if r._value > accumulator.max then r._value else accumulator.max,
        mean: (r._value + accumulator.sum) / (accumulator.count + 1.0)
      })
    )

And the following posts:

But Id still use 3 separate aggwindow functions