Multiple Aggregation Projections in fluxql

Hi,

Trying to recreate some of the basics in Flux compared to InfluxQL is quite a stretch at present.

I can’t work out how to ask for multiple projections.

select mean(temp) as temp_mean, count(temp) as num_points from my_measurement

I’m not sure how to go about this in Flux.

Pointers appreciated.

Thanks

Rob

Hello @robshep952,
Welcome! Great question.
One way is to:

count = from(bucket: "cats-and-dogs")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "bunny" or r["_measurement"] == "cats")
  |> filter(fn: (r) => r["_field"] == "young")
  |> count(column: "_value")

mean = from(bucket: "cats-and-dogs")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "bunny" or r["_measurement"] == "cats")
  |> filter(fn: (r) => r["_field"] == "young")
  |> mean(column: "_value")

join(tables: {mean: mean, count: count}, on: ["_time", "_field"], method: "inner")

However, I feel like there should be a way to do this without joins. I’m looking into it and I’ll get back to you if learn anything. Thank you!

There are a few things they could do.
Union and pivot two streams:

  |> range(start: -1d)
  |> filter(fn: (r) => r._field == "temp")
temp_mean = data |> mean() |> set(key: "_field", as: "temp_mean")
num_points = data |> count |> set(key: "_field", as: "num_points")
union(tables: [temp_mean, num_points])
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  1. Use reduce to create a custom aggregate that does both at the same time:
data = from(bucket: "example-bucket")
  |> range(start: -1d)
  |> filter(fn: (r) => r._field == "temp")
  |> reduce(
    identity: {num_points: 0, sum:0.0, temp_mean:0.0}
    fn: (r) => ({
      num_points: accumulator.num_points + 1,
      sum: r._value + accumulator.sum,
      temp_mean: (accumulator.sum + r._value) / (float(v: num_points + 1))
    })
  )
  |> drop(columns: ["sum"])

Thanks for the explanation I tried the above query for my use case but it’s giving the error data as an undefined identifier.
Can you tell what is the significance of data and why it’s giving the error?
I have used the max and min instead of the sum. is that the correct way?
from(bucket: “atlas_om”)

  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)

  |> filter(fn: (r) =>r._measurement == "measurement_name" and r._field == "column_name")

  |> filter(fn: (r) => r["host"] == "26e286667de4")

  max_column_name = data |> max() set(key: "_field", as: "max")

  min_column_name = data |> min() set(key: "_field", as: "min")

  union(table:[max_column_name ,min_column_name ])

    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")

I tried this different method but it also giving an error as temp as an undefined identifier. I am not sure this is the correct way to do it.
from(bucket: “atlas_om”)
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: ® =>r._measurement == “measurement_name” and r._field == “column_name”)
|> filter(fn: ® => r[“host”] == “26e286667de4”)
|> reduce(
identitiy:{max_temp,min_temp,temp},
fn:(r,accumulator) => ({
max_temp: r._value.max(),
min_temp: r._value.min(),
temp: max_temp - min_temp
})
)
|> drop(columns: [“max_temp”,“min_temp”])
|> window(every: 1h)
|> yield(name: “differance”)

Hello @Ravikant_Gautam,
For me to help you I think it might be easier if you can explain what you’re trying to do?
It looks like you’re trying to find the difference between the max and min value in a table right for one field? or across all fields?

data = from(bucket: "my-bucket")
  |> range(start: -5m)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["_field"] == "usage_system" or r["_field"] == "usage_user")
  |> filter(fn: (r) => r["cpu"] == "cpu-total")
  |> limit(n:3)

min = 
  data 
  |> min()
  
max = 
  data
  |> max() 


join(tables: {min: min, max: max}, on: ["_start"], method: "inner")
  |> map(fn: (r) => ({ r with _value: r._value_max - r._value_min }))

The script above would calculate the max-min across fields.

Alternatively, this might be useful to you?

Hi Anaisdg,

How do I do min/max with the reduce() function?

Thanks

Hello @Nam_Giang,
I wouldn’t recommend it because reduce() should really be reserved for custom aggregators. Using from() + range() + filter() + a bare aggregate or selector (like min or max) is a pushdown pattern for InfluxDB meaning that the transformation work is executed in storage rather than memory and is much more computationally efficient.
Please see:

But… here you go (I just wrapped in in a function):

minMaxMean = (tables=<-) =>
  tables
    |> reduce(
      identity: {count: 0.0, sum: 0.0, min: 0.0, max: 0.0},
      fn: (r, accumulator) => ({
        count: accumulator.count + 1.0,
        min: if accumulator.count == 0.0 then r._value else if r._value < accumulator.min then r._value else accumulator.min,
        max: if accumulator.count == 0.0 then r._value else if r._value > accumulator.max then r._value else accumulator.max,
      })
    )

Hi Anaisdg,

In my data I need to do the aggregate to find min/max/avg/sum of multiple fields. I’m not sure what’s the best option cause multiple unions are also not a good option in term of performance.

Hello @Nam_Giang,
Can you just use multiple yield statements? something like:

data = from
range
filter for multiple fields 
// add group() where you want to find min/max/sum/avg across multiple fields

data 
|> yield(name:"raw")

data
|> min()
|> yield(name:"min")


data
|> max)
|> yield(name:"max")


data
|> mean()
|> yield(name:"avg")

data
|> sum()
|> yield(name:"sum")
1 Like

Hi Anaisdg,

Since I have this:

...
|> schema.fieldsAsCols()
|> group(columns: ["_measurement"], mode:"by")
...

when I do data |> min() or max()… it said there’s no _value column, which is true cause I used schema.fieldsAsCols().

Ultimately I need a table like this with A, B being fields, grouped by the _measurement:

time, _measurement, rawA, rawB, minA, maxA, avgA, minB, maxB, avgB