Trying to build a set of percentiles or quantiles in one query, possible bug when using aggregateWindow with a custom function?

I’ve created the following flux query to try and build many percentiles into one table stream:

import "experimental/array"

// Grab all the wind direction data for the given range
data = from(bucket: "fallen-leaf")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["entity_id"] == "ws2032_28817_wd")
  |> filter(fn: (r) => r["_field"] == "value")
  |> filter(fn: (r) => r["_measurement"] == "°")

// The percentiles I want to calculate
p = [1,5,10,15,20,25,30,35,40,45,50,55,60,65,70,75,80,85,90,95,99]

// for each percentile, create a table that aggregate the given percentile into one hour windows
percentileTables = p 
  |> array.map(fn: (x) => {
    return (
      data 
        |> aggregateWindow(
            column: "_value",
            every: 1h,
            fn: (column, tables=<-) => tables |> quantile(q: float(v: x)/100.0),
        )
        |> set(key: "_field", value: "p${x}")
    )
  })

// Combine the percentiles and pivot them into a column for each percentile
union(tables: percentileTables)
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")

When I try to run this query in a Notebook I get the following error:

error calling function "map" @38:12-50:10: pipe parameter value provided to function with no pipe parameter defined

If I change the fn that is passed to aggregateWindow to just fn: mean, the query works, though the data isn’t what I’m looking for. I believe there might be a bug with how the custom function and in-piped argument named tables is behaving.

Also, I’m open to suggestions if there is a different way to structure this query to get the answer I’m looking for.

Hi @jaxzin,
This should do the job:

import "experimental/array"

// Grab all the wind direction data for the given range
data = () => from(bucket: "plantbuddy")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "sensor_data")
  |> filter(fn: (r) => r["_field"] == "soil_temperature")


q = (v) => {
     d = data ()
    |> aggregateWindow(
        column: "_value",
        every: 1h,
        fn: (column, tables=<-) => tables |> quantile(q: v, column: column),
    )
        |> set(key: "_field", value: "p${v}")
    

return d
}

// The percentiles I want to calculate
p = [0,1]
union(tables:  p 
  |> array.map(fn: (x) => (  q(v: float(v:x)))))

The only issue is that the quantile must between 0 and 1. So your array will not compute as it out with the maximum values: quantile() function | Flux 0.x Documentation

Thank you, that was the guidance I needed to get to a query that works.

This is the final version that I landed on:

import "experimental/array"

period = 1h

// Grab all the wind direction data for the given range
data = () => from(bucket: "fallen-leaf")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["entity_id"] == "ws2032_28817_wd")
  |> filter(fn: (r) => r["_field"] == "value")
  |> filter(fn: (r) => r["_measurement"] == "°")
  |> map(fn: (r) => ({r with _value: r._value}))
  |> aggregateWindow( // wind data is limited to 45 deg increments, so average it to smooth it
    column: "_value", 
    every: duration(v: int(v: period) / 12), 
    fn: mean
  )

// The percentiles I want to calculate
p = [1,5,10,15,20,25,30,35,40,45,50,55,60,65,70,75,80,85,90,95,99]

q = (v) => {
  d = data ()
    |> aggregateWindow(
        column: "_value",
        every: period,
        fn: (column, tables=<-) => tables |> quantile(q: float(v: v)/100.0, column: column, method: "estimate_tdigest"),
    )
    |> set(key: "_field", value: "p${if v < 10 then "0" else ""}${v}")
  return d
}

union(tables:  p 
  |> array.map(fn: (x) => (  q(v: x))))