Multiple aggregations min/mean/max *per day*

Similar to another question (Multiple Aggregation Projections in fluxql).

That question does not seem to aggregate per (e.g.) day, so I assume it will get the min/mean/max for all data.

I have heart rate data (_field: min, avg, max), and I want to aggregate them per day.
The default query uses “mean” for all of them:

from(bucket: "healthdata")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "heart_rate")
  |> filter(fn: (r) => r["_field"] == "avg" or r["_field"] == "max" or r["_field"] == "min")
  |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)
  |> yield(name: "mean")

Want to get -aggregated per day- min for _field:min, max for _field:max and mean for _field:avg

Suggestions?

Hello @wvk,
You can do the following:

common = from(bucket: "healthdata")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "heart_rate")

minHeart = common
|> filter(fn: (r) => r["_field"] == "min")
|> aggregateWindow(every: 1d, fn: min, createEmpty: false)
|> yield(name: "min")

maxHeart = common
|> filter(fn: (r) => r["_field"] == "max")
|> aggregateWindow(every: 1d, fn: max, createEmpty: false)
|> yield(name: "max")

avgHeart = common
|> filter(fn: (r) => r["_field"] == "avg")
|> aggregateWindow(every: 1d, fn: mean, createEmpty: false)
|> yield(name: "mean")

When using variables try not to name the variable the same thing as a Flux function to avoid naming conflicts.
You can pass any selector or aggregator function into the aggregateWindow() function.

Please let me know if you need more help!

Thanks.

Thanks @Anaisdg !
What is the step to combine the three? I see union / join / map in other examples, but not sure what it would look like in this case.

Hello @wvk,
Why do you want to combine all three?
What are you looking to do after you combine them?

As you probably can tell I am a novice. Would like to make request from application to InfluxDB and get one response with various metrics. Therefore thought I needed to combine?

I don’t think you need to combine. The query above will produce all of your results they’ll just be in different tables.
Are you using a client library? They usually have examples for how to iterate through responses to get the values from all of the tables. I would suggest trying that first.

You can also use array.from() to construct a final table:

import "array"
common = from(bucket: "healthdata")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "heart_rate")

minHeart = common
|> filter(fn: (r) => r["_field"] == "min")
|> aggregateWindow(every: 1d, fn: min, createEmpty: false)
|> findRecord(fn: (key) => true, idx: 0)


maxHeart = common
|> filter(fn: (r) => r["_field"] == "max")
|> aggregateWindow(every: 1d, fn: max, createEmpty: false)
|> findRecord(fn: (key) => true, idx: 0)


avgHeart = common
|> filter(fn: (r) => r["_field"] == "avg")
|> aggregateWindow(every: 1d, fn: mean, createEmpty: false)
|> findRecord(fn: (key) => true, idx: 0)


array.from(rows: [{_time: now(), min: minHeart._value, max: maxHeart._value, mean: avgHeart._value}])

@wvk but yes you can also use union()

union(tables: [minHeart, maxHeart, avgHeart])

Hello, @Anaisdg, please allow me to ask a (perhaps silly) question:

How come that in your original answer after the first pipeline (common = …) all three following pipelines (minHeart = …, maxHeart = … and avgHeart = …) get the output of the first as their input (despite common isn’t explicitly used there)?

Best regards!

Hello @Gnoudini,
Sorry it should be. I edited it.

1 Like

Hello, @Anaisdg

import "array"
common = from(bucket: "GIN")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "IPE")

minHeart = common
|> filter(fn: (r) => r["_field"] == "Global")
|> aggregateWindow(every: 1m, fn: min, createEmpty: false)
|> findRecord(fn: (key) => true, idx: 0)


maxHeart = common
|> filter(fn: (r) => r["_field"] == "Global")
|> aggregateWindow(every: 1m, fn: max, createEmpty: false)
|> findRecord(fn: (key) => true, idx: 0)


avgHeart = common
|> filter(fn: (r) => r["_field"] == "Global")
|> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
|> findRecord(fn: (key) => true, idx: 0)


array.from(rows: [{_time: now(), min: minHeart._value, max: maxHeart._value, mean: avgHeart._value}])

I want to obtain the max, min and mean values in this table.

The values I’m supposed to get are 28.8 (min), 36.7(max) and 32.7(mean).

Thanks in advance for your help.

why it does not work in influxdb:2.4

import "array"

result = from(bucket: "example02")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "quota")
  |> filter(fn: (r) => r["_field"] == "value")
  |> filter(fn: (r)=>r["deviceId"] =="client_123")
  |> filter(fn: (r)=>r["quotaId"] == "1")


minV = result
    |> aggregateWindow(every: 15m, fn: min, createEmpty: false)
    
maxV = result
    |> aggregateWindow(every: 15m, fn: max, createEmpty: false)

meanV = result
    |> aggregateWindow(every: 15m, fn: mean, createEmpty: false)


array.from(rows:[{_time:now(),min:minV._value}])

@huanhuangongzi
I’m not sure. what error are you getting?
I also like to put

|> yield(name: "like print statement 1")
or 
|> yield(name: "like print statement 2")

after certain lines of the query to see exactly at what point its failing and what the shape of the data is like