Flux multiple aggregates

I wish to downsample our data using Flux.

For each point, we have two fields Rank and Score plus a tag ‘id’. For each 24 hour period, I wish to create a new point in a new measurement containing the mean score, min, max and mean rank.

In InfluxQL I can calculate this data with:

SELECT mean("Rank") AS "RankMean", max("Rank") AS "RankMax", min("Rank") AS "RankMin", mean("TotalScore") AS "TotalScoreMean" FROM "testing"."autogen"."OurData" WHERE time > :dashboardTime: GROUP BY time(1d), "Id" FILL(null)

I have spent a day trying to figure out the equivalent using flux. I felt that I should probably use the reduce() function but I can’t get this to reset its identity for each group of values for each id.

What would the equivalent of the above InfluxQL be in Flux?

One of my attempts which returns the correct values but takes an order of magnitude longer to run is:

  ranks = from(bucket: "testing/autogen")
      |> range(start: dashboardTime)
      |> filter(fn: (r) => r._measurement == "OurData" and r._field == "Rank")
      |> window(every: 24h)
    ranksmean = ranks
      |> mean()
      |> duplicate(column: "_stop", as: "_time")
      |> set(key:"_measurement", value:"Daily")
      |> set(key:"_field", value:"RankMean")
      |> drop(columns: ["_start", "_stop"])
    ranksmax = ranks
      |> max()
      |> toFloat()
      |> drop(columns:["_time"])
      |> duplicate(column: "_stop", as: "_time")
      |> set(key:"_measurement", value:"Daily")
      |> set(key:"_field", value:"RankMax")
      |> drop(columns: ["_start", "_stop"])
    ranksmin = ranks
      |> min()
      |> toFloat()
      |> drop(columns:["_time"])
      |> duplicate(column: "_stop", as: "_time")
      |> set(key:"_measurement", value:"Daily")
      |> set(key:"_field", value:"RankMin")
      |> drop(columns: ["_start", "_stop"])
    score = from(bucket: "blvd-staging/autogen")
      |> range(start: dashboardTime)
      |> filter(fn: (r) => r._measurement == "OurData" and r._field == "TotalScore")
      |> window(every: 1d)
      |> mean()
      |> duplicate(column: "_stop", as: "_time")
      |> set(key:"_measurement", value:"Daily")
      |> set(key:"_field", value:"ScoreMean")
      |> drop(columns: ["_start", "_stop"])
    union(tables: [ranksmin, ranksmax, ranksmean, score])
      |> group(columns: ["Id"])
      |> yield()
4 Likes

I have a similar issue… I’d like to know how to do a group by host,time(1h) in flux . The group by allows multiple columns in flux but doesnt allow to aggregate time by a certain amount like (1h,1d etc) . Although window() does provide the time aggregate function , it seems confusing on how to apply both window() and group() functions to achieve a group by host,time(1h) like combination

push

Is there a possibility to apply aggregate functions in a single run over the data in Flux? If not, when will it be added?

@databender You can use reduce() to create custom aggregate functions and do multiple aggregations in a single pass. Here’s the documentation for creating custom aggregate functions: Create custom aggregate functions | InfluxDB OSS 2.0 Documentation

2 Likes

Thank you very much.

I’m glad that this is possible.

It would probably still be a good idea to add a built-in (optimized) function for this case, as I suspect it is a pretty common one.
In InfluxQL this was way more straight-forward.

Hi Scott, how do I aggregate avg/min/max with reduce() function? I need to do a multiple aggregation that includes avg, min, max and sum.

Thanks

Another basic usecase is wanting to add a count column, when doing another aggregate.

I know this thread has been around for a while, but a lot of things have changed in Flux since. For the original query, I created that following Flux query that:

  1. Creates data() function to query the base data. I structure this as a function to optimize the query. As a function, all the storage pushdowns can continue beyond the function. If it was just a variable, the pushdowns wouldn’t work.
  2. Creates a custom aggregate function that windows and aggregates data by day, uses a specific aggregate function, and sets the field to a custom name.
  3. Unions the streams into a single stream.
  4. Pivots fields into columns so results are structured like you get from InfluxQL.
data = () =>
    from(bucket: "testing/autogen")
        |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
        |> filter(fn: (r) => r._measurement == "OurData")

aggregate = (tables=<-, filterFn, agg, name) =>
    tables
        |> filter(fn: filterFn)
        |> aggregateWindow(every: 1d, fn: agg)
        |> set(key: "_field", value: name)

union(
    tables: [
        data() |> aggregate(filterFn: (r) => r._field == "Rank", agg: mean, name: "RankMean"),
        data() |> aggregate(filterFn: (r) => r._field == "Rank", agg: max, name: "RankMax"),
        data() |> aggregate(filterFn: (r) => r._field == "Rank", agg: min, name: "RankMin"),
        data() |> aggregate(filterFn: (r) => r._field == "TotalScore", agg: mean, name: "TotalScoreMean"),
    ],
)
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")

@Erich_Spaker For adding a count column, you’d do something similar. For example:

data = () =>
    from(bucket: "example-bucket")
        |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
        |> filter(fn: (r) => r._measurement == "example-measurement")
        |> filter(fn: (r) => r._field == "example-field")

aggregate = (tables=<-, fn, name) =>
    tables
        |> aggregateWindow(every: 1d, fn: fn)
        |> set(key: "_field", value: name)

union(
    tables: [
        data() |> aggregate(fn: mean, name: "example-field-mean"),
        data() |> aggregate(fn: count, name: "example-field-count"),
    ],
)
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")

You can absolutely use reduce() to do this, but reduce() can’t be pushed down to the storage layer, so it can’t be optimized. Using this approach, you preserve the pushdown optimizations of the query. On larger queried datasets, this approach is MUCH more performant.

data = () =>

from(bucket: v.defaultBucket)

|> range(start: v.timeRangeStart, stop:v.timeRangeStop)
|> filter(fn: (r) => r[“_measurement”] == “Example-Data”)
|> aggregateWindow(every: v.windowPeriod, fn: mean)

tmp = (tables=<-, filterFn, nametmp, x, y, z, Tmpfactor_1 = 0.15) =>
tables
|> filter(fn: filterFn)
|> map(fn: (r) => ({r with nametmp: (((x + y) / 2.0) - z - Tmpfactor_1)}))
|> keep(columns: nametmp)
union(
tables: [
data() |> tmp(filterFn: (r) => r[“_field”] == “04A” and r[“_field”] == “03A” and r[“_field”] == “02A”,
nametmp: “atmp”,
x: r[“04A”], y: r[“03A”], z: r[“02A”],
),
],
)
|> pivot(rowKey: [“_time”], columnKey: [“_field”], valueColumn: “_value”)
|> yield()

Hello @scott ,

I am trying to create a custom function called “tmp”, which:

  1. filters (filterFn) the columns named “04A”, “03A” and “02A”
  2. map a new column (nametmp) as “atmp”, which performs math function of ((x + y) / 2.0) - z - Tmpfactor_1, where x, y and z are the columns filtered in the first point (04A, 03A, 02A). Tmpfactor_1 is always set to 0.15.
  3. keep the column “nametmp” which we newly mapped.

Is there an error with this code or can multiple basic math functions not be performed within custom functions?

Please let me know, if there is something I am missing :frowning:

There are few things here:

  1. With the current Flux syntax, you can’t dynamically name or reference columns is raw Flux. For example: {r with nametmp: ...} will not work. This will change soon, but as it stands, you can’t do this. You have to statically name the column.
  2. I wouldn’t define tmp() as a transformation (takes a stream of tables as input and outputs a stream of tables). I would define it as a “static” function and call that function in map().
  3. To do operations across fields, you have to pivot the fields into columns first. Currently, you aren’t doing that.
  4. Where you only have one stream of tables, there no need for a union.
tmp = (x, y, z, Tmpfactor_1=0.15) =>
    (((x + y) / 2.0) - z - Tmpfactor_1)

from(bucket: v.defaultBucket)
    |> range(start: v.timeRangeStart, stop:v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "Example-Data")
    |> filter(fn: (r) => r["_field"] == "04A" and r["_field"] == "03A" and r["_field"] == "02A")
    |> aggregateWindow(every: v.windowPeriod, fn: mean)
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> map(fn: (r) => ({r with nametmp: tmp(x: r["04A"], y: r["03A"], z: r["02A"])}))
    |> keep(columns: ["nametmp"])
1 Like

Hey,
I used your solution to calculate some values like mean, max, min, … for my data. It works perfectly fine for me except for the pivot() command. Somehow it doesn’t put all values in one table and it’s kind of random which column is put in a new table.

data = () => 
  from(bucket: "Bucket")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "e5")
    |> filter(fn: (r) => r["_field"] == "ew")

aggregate = (tables=<-, agg, name) =>
    tables
        |> aggregateWindow(every: 1h, fn: agg)
        |> set(key: "_field", value: name)    

union(
    tables: [
        data() |> aggregate(agg: mean, name: "mean"),
        data() |> aggregate(agg: max, name: "upperfence"),
        data() |> aggregate(agg: min, name: "lowerfence"),
        data() |> aggregate(agg: stddev, name: "sd"),
        data() |> aggregate(agg: (column, tables=<-) => tables |> quantile(q: 0.25, column: "_value"), name: "q1"),
        data() |> aggregate(agg: median, name: "median"),
        data() |> aggregate(agg: (column, tables=<-) => tables |> quantile(q: 0.75, column: "_value"), name: "q3"),
    ],
)

|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")

At some point I added a this:

|> group(columns: ["_measurement"])

It worked out that it’s one single table with all columns but there are double timestamps now.

Any ideas?