How to apply different aggregation functions to different fields from a query

Hi Team,
I am currently using Grafana to render time series data stored in Influxdb.

I have a query that returns a number of fields and from my understanding, each of these fields will constitute a separate table/series since the field key is a component of the series key. The task is to apply different aggregation functions to each of the series. So imagine the fields are last, min and max, I would like to apply aggregateWindow with functions of last, min and max respectively.

I stumbled on InfluxQL to new task FLUX with resample and group which is similar but I had a few questions.

amount = price = from(bucket: "mydb")
  |> from(start: -10m)
  |> filter(fn: (r) => r._measurement == "trades" and r._field == "amount")
  1. Why do we assign to price and then to amount above?
  2. Given that I am visualizing from Grafana, do I still need to create a task? I suspect my use case might be less complex than the use case in the referenced post.

Many thanks in advance.

@ValentineO,
Hello that was just a typo it should be:
1.

amount =  from(bucket: "mydb")
  |> from(start: -10m)
  |> filter(fn: (r) => r._measurement == "trades" and r._field == "amount")
  1. If you just want to visualize the last min and max, no you don’t need to create a task. Unless of course you’re performing this aggregation and encountering slow query problems. Then you might want to create a a task and write the aggregated data back to a bucket. But I would try without a task first.

Please note that you can use reduce like in the example shown, but you can also do:

data = from(bucket: "mydb")
  |> from(start: -10m) |> filter(fn: (r) => r._measurement == "ur measurement")
  
lastData = data |> last() |> yield(name: "last") 
minData = data |> min() |> yield(name: "min") 
maxData = data |> max() |> yield(name: "max") 

Data is automatically grouped by individual series in 2.x and then when you apply a min, last, or max function to that data you get your aggregated values for each series.
By contrast you can use the group() function to find aggregations across multiple tags or field after grouping them together.

You can also use the aggregateWindow() if you want to calculate aggregates over windows of time:

Hi @Anaisdg,
Thanks for your reply. I indeed want to use aggregateWindow for downsampling each of the series before displaying on the graph. In your sample code, how is the query able to apply last() to the field/series named last, min() to the field/series named min and max() to the field/series named max without referencing r.field?

I am still finding my way around Flux but it almost seems like the solution is applying min, max and last for every series but what I want is to apply min on series min, max on series max and last on series last and plot the resulting values.

Apologies if I have misunderstood.
Val.

Hi @Anaisdg or @scott
I will greatly appreciate your help to clarify my misunderstanding on how an expression like below is evaluated

lastData = data |> last() |> yield(name: "last")
  1. Say my data has a stream of 3 tables/series. The first is from a field named last, the second is a field named min and the third is a field named max.
  2. I pipe that data to the last() function. The documentation in last() function | Flux Documentation says the last row from each of the tables in the stream will be returned (i.e the last row from last, last row from min and last row from max series) and likewise for min() and max().

What I would like is to use an aggregateWindow to return the last in some window from the last series, the min in some window from the min series and the max in some window from the max series. Is there a way I can achieve this or perhaps your solution already does this?

Many thanks in advance.
Val.

@ValentineO Just to clarify your use case:

  • You already have fields in the database called last, min, and max? And you want to apply aggregate selector functions to each of these existing fields (for example: field_last |> last(), field_min |> min(), field_max |> max())?

  • Or do you want to apply an aggregate/selector function and then store the aggregate value as a new field named last, min, max respectively?

Hi @scott,
The first scenario matches my case precisely.

  • You already have fields in the database called last, min, and max? And you want to apply aggregate selector functions to each of these existing fields (for example: field_last |> last(), field_min |> min(), field_max |> max())?
    Cheers.

Ok, this is how I’d do it:

// Define the base data set as a function to preserve pushdowns and
// optimize performance
baseData = () =>
    from(bucket: "mydb")
        |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
        |> filter(fn: (r) => r._measurement == "trades")

// Define a function to get a specific field from the base dataset
// and apply an aggregate function to it.
getSet = (tables=<-, field, fn) =>
    tables
        |> filter(fn: (r) => r._field == field)
        |> aggregateWindow(fn: fn, every: v.windowPeriod)

last = baseData() |> getSet(field: "last", fn: last)
min = baseData() |> getSet(field: "min", fn: min)
max = baseData() |> getSet(field: "max", fn: max)

union(tables: [last, min, max])
1 Like

Much appreciated @scott .
Val.