Flux multiple aggregates

I wish to downsample our data using Flux.

For each point, we have two fields Rank and Score plus a tag ‘id’. For each 24 hour period, I wish to create a new point in a new measurement containing the mean score, min, max and mean rank.

In InfluxQL I can calculate this data with:

SELECT mean("Rank") AS "RankMean", max("Rank") AS "RankMax", min("Rank") AS "RankMin", mean("TotalScore") AS "TotalScoreMean" FROM "testing"."autogen"."OurData" WHERE time > :dashboardTime: GROUP BY time(1d), "Id" FILL(null)

I have spent a day trying to figure out the equivalent using flux. I felt that I should probably use the reduce() function but I can’t get this to reset its identity for each group of values for each id.

What would the equivalent of the above InfluxQL be in Flux?

One of my attempts which returns the correct values but takes an order of magnitude longer to run is:

  ranks = from(bucket: "testing/autogen")
      |> range(start: dashboardTime)
      |> filter(fn: (r) => r._measurement == "OurData" and r._field == "Rank")
      |> window(every: 24h)
    ranksmean = ranks
      |> mean()
      |> duplicate(column: "_stop", as: "_time")
      |> set(key:"_measurement", value:"Daily")
      |> set(key:"_field", value:"RankMean")
      |> drop(columns: ["_start", "_stop"])
    ranksmax = ranks
      |> max()
      |> toFloat()
      |> drop(columns:["_time"])
      |> duplicate(column: "_stop", as: "_time")
      |> set(key:"_measurement", value:"Daily")
      |> set(key:"_field", value:"RankMax")
      |> drop(columns: ["_start", "_stop"])
    ranksmin = ranks
      |> min()
      |> toFloat()
      |> drop(columns:["_time"])
      |> duplicate(column: "_stop", as: "_time")
      |> set(key:"_measurement", value:"Daily")
      |> set(key:"_field", value:"RankMin")
      |> drop(columns: ["_start", "_stop"])
    score = from(bucket: "blvd-staging/autogen")
      |> range(start: dashboardTime)
      |> filter(fn: (r) => r._measurement == "OurData" and r._field == "TotalScore")
      |> window(every: 1d)
      |> mean()
      |> duplicate(column: "_stop", as: "_time")
      |> set(key:"_measurement", value:"Daily")
      |> set(key:"_field", value:"ScoreMean")
      |> drop(columns: ["_start", "_stop"])
    union(tables: [ranksmin, ranksmax, ranksmean, score])
      |> group(columns: ["Id"])
      |> yield()
5 Likes

I have a similar issue… I’d like to know how to do a group by host,time(1h) in flux . The group by allows multiple columns in flux but doesnt allow to aggregate time by a certain amount like (1h,1d etc) . Although window() does provide the time aggregate function , it seems confusing on how to apply both window() and group() functions to achieve a group by host,time(1h) like combination

push

Is there a possibility to apply aggregate functions in a single run over the data in Flux? If not, when will it be added?

@databender You can use reduce() to create custom aggregate functions and do multiple aggregations in a single pass. Here’s the documentation for creating custom aggregate functions: https://docs.influxdata.com/influxdb/v2.0/query-data/flux/custom-functions/custom-aggregate/

2 Likes

Thank you very much.

I’m glad that this is possible.

It would probably still be a good idea to add a built-in (optimized) function for this case, as I suspect it is a pretty common one.
In InfluxQL this was way more straight-forward.

1 Like

Hi Scott, how do I aggregate avg/min/max with reduce() function? I need to do a multiple aggregation that includes avg, min, max and sum.

Thanks