I’ve got a measurement with values tagged either A or B. I want to process these values mostly the same except for a single operation that needs to be different (aggregate window functions “mean” or “last” depending on the tag value).
I do not want to use a union of distinct processing pipelines because I only need a single operation to be different and because of the (assumed) additional overhead.
I realize you can provide a function to aggregateWindow. In theory this would be perfect. However, I am unable to discriminate between measurement values based on tag values. What I’ve tried is this:
|> aggregateWindow(
every: 300s,
fn: (column, tables=<-) => if "_field\" == 'A' then mean(tables) else last(tables),
createEmpty: false)
Is there a way to do this?
If the above approach doesn’t work, is there an alternative approach that does?
Hello @00000000000000000000,
Welcome!
I have a sneaking suspicion that you might just want to use multiple yield() functions like:
mydata = from(bucket: "airsensor")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "airSensors")
|> filter(fn: (r) => r["sensor_id"] == "TLM0101")
co = mydata |> filter(fn: (r) => r["_field"] == "co") |> aggregateWindow(every:30s, fn: mean, createEmpty:false)
yield(name:"if A then mean")
//won't yield results unless another field exists
humidity = mydata |> filter(fn: (r) => r["_field"] != "co") |> aggregateWindow(every:30s, fn: last, createEmpty:false)
yield(name:"if not A then last")
//won't yield results unless another field exists
Does that do what you want? Or am I misunderstanding?
1 Like
Thank you @Anaisdg,
You are correct. However, multiple yield() do not appear to be faster (or slower) compared to using a single union(). We were hoping to reduce query time, though.
On that note: writing…
mydata = from(bucket: "airsensor")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "airSensors")
|> filter(fn: (r) => r["sensor_id"] == "TLM0101")
co = mydata |> filter(fn: (r) => r["_field"] == "co") |> aggregateWindow(every:30s, fn: mean, createEmpty:false)
yield(name:"if A then mean")
humidity = mydata |> filter(fn: (r) => r["_field"] != "co") |> aggregateWindow(every:30s, fn: last, createEmpty:false)
yield(name:"if not A then last")
…instead of…
co = from(bucket: "airsensor")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "airSensors")
|> filter(fn: (r) => r["sensor_id"] == "TLM0101") |> filter(fn: (r) => r["_field"] == "co") |> aggregateWindow(every:30s, fn: mean, createEmpty:false)
yield(name:"if A then mean")
humidity = from(bucket: "airsensor")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "airSensors")
|> filter(fn: (r) => r["sensor_id"] == "TLM0101") |> filter(fn: (r) => r["_field"] != "co") |> aggregateWindow(every:30s, fn: last, createEmpty:false)
yield(name:"if not A then last")
…drastically decreases performance for us (we are seing ~4 seconds versus ~1 minute). I am using InfluxDB v2.0.7
2 Likes
Hello @00000000000000000000,
Great! Yah storing top level/base queries in a variable that you reference will prevent you from querying for that data twice.
This might be helpful:
Hello @Anaisdg,
I’m confused. The docs recommend not to query twice and instead store the result in a variable. I re-ran my tests, but still: The recommended approach is considerably slower (1-2 minutes, compared to 4 seconds using the discouraged approach).
Hello @00000000000000000000,
I’m sorry I misread your previous reply. It sounds like could be that the advantages of the pushdown pattern are overweighing the disadvantages of querying for your data twice.