Flux: aggregation function using result of another aggregation function

Hi,

I am quite new at flux language, and I have a question:
I have a simple dataset with just one value.
I am using a window to aggregate by a chosen interval and store the result in a table variable like this:

val=from(bucket: "myBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "myMeasurement")
  |> drop(columns:["_measurement"])
  |> window(every: 5m)

Then, I can use significant functions for me, like min, max, mean and stddev like this:

minval=val
  |> min()
  |> set(key: "_field", value: "min")

maxval=val
  |> max()
  |> set(key: "_field", value: "max")

avgval=val
  |> mean()
  |> set(key: "_field", value: "avg")

sdval=val
  |> stddev()
  |> set(key: "_field", value: "stddev")

And to finish, I can concatenate all of this, polish up the windowing and make a nice representation:

union(tables: [minval, maxval, avgval, sdval])
  |> duplicate(column: "_stop", as: "_time")
  |> drop(columns:[ "_start", "_stop"])
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")

Now, I would like to calculate two different stddevs, the one above the mean, and the one under, like described there: https://jdmeducational.com/1-2-or-3-standard-deviations-above-or-below-the-mean/

It seems easy, all would need is something like this:

lowsdval=val
  |> filter(fn: (r) => r._value < MEAN)
  |> stddev()

I am very new to flux, and I did not find the way to do that (it is probably super simple).

I can do it easily with a constant like this (assuming the mean value is 50):

lowsdval=val
  |> filter(fn: (r) => r._value < 50)
  |> stddev()

Now, how can I achieve this with the value of the mean of the current window? It is even already calculated in the avgval table, so I suppose I just need to extract it from it with some syntax I missed.

Thank you in advance for any help or hint.

@bolemo

I agree this should be simple. I also do not know the syntax, but maybe one of these?

|> filter(fn: (r) => r._value < "avgval")
or
|> filter(fn: (r) => r._value < "avg")

@grant1 , I believe that “avgval” or “avg” would be read as literal strings.

I am trying to use a function like this:

getAvg = (time=<-, time) => {
  extract=avglat
    |> filter(fn: (r) => r._stop == time)
//    |> findRecord(fn: (key) => true, idx: 0)
    |> findColumn(fn: (key) => true, column: "_value")

    return extract[0]
}

But I have another problem with this… That I use findRecord or findRecord, it times out.
But the result of

extract=avglat
    |> filter(fn: (r) => r._stop == time)

Is as expected (for example, and with time set to a valid value like here 2022-11-29T09:25:00.000Z):

_value       _start                       _stop                        _field
-----------------------------------------------------------------------------
11.3         2022-11-29 10:20:00 GMT+1    2022-11-29 10:25:00 GMT+1    avg

But I am unable to extract the scalar float value 11.3 from that simple single row table…
Not sure if I am doing something wrong or if this is a bug with findRecord / findRow

It appears that this line is responsible for the timeout:
|> drop(columns:["_measurement"])

However, even like that, the CPU payload is high…
I created a task that calculates average, min, max and stddev every minute and stores them in a separate bucket, and I just have to use this new bucket to represent the data; it is a lot faster that using union.

option task = {name: "Calculate", cron: "* * * * *"}

data =
    from(bucket: "myBucket")
        |> range(start: -2m, stop: -1m)
        |> timeShift(duration: -1m)
        |> filter(fn: (r) => r._measurement == "myMeasurement")

data
    |> mean()
    |> to(
        bucket: "myCalculatedBucket",
        timeColumn: "_stop",
        fieldFn: (r) => ({"avg": r._value}),
    )

data
    |> min()
    |> to(
        bucket: "myCalculatedBucket",
        timeColumn: "_stop",
        fieldFn: (r) => ({"min": r._value}),
    )

data
    |> max()
    |> to(
        bucket: "myCalculatedBucket",
        timeColumn: "_stop",
        fieldFn: (r) => ({"max": r._value}),
    )

data
    |> stddev()
    |> to(
        bucket: "myCalculatedBucket",
        timeColumn: "_stop",
        fieldFn: (r) => ({"stddev": r._value}),
    )

And to represent the data, I simply use:

import "math"
wPeriod = duration(v: int(v: math.mMax(x: float(v: int(v: v.windowPeriod) * 10), y: 60000000000.0)))

from(bucket: "myCalculatedBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r._measurement == "myMeasurement")
  |> aggregateWindow(every: wPeriod,fn: mean)