Apply differing functions to the same stream of tables

I’ve got a measurement with values tagged either A or B. I want to process these values mostly the same except for a single operation that needs to be different (aggregate window functions “mean” or “last” depending on the tag value).

I do not want to use a union of distinct processing pipelines because I only need a single operation to be different and because of the (assumed) additional overhead.

I realize you can provide a function to aggregateWindow. In theory this would be perfect. However, I am unable to discriminate between measurement values based on tag values. What I’ve tried is this:

|> aggregateWindow(
    every: 300s,
    fn: (column, tables=<-) => if "_field\" == 'A' then mean(tables) else last(tables),
    createEmpty: false)

Is there a way to do this?

If the above approach doesn’t work, is there an alternative approach that does?

Hello @00000000000000000000,
Welcome!
I have a sneaking suspicion that you might just want to use multiple yield() functions like:

mydata = from(bucket: "airsensor")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "airSensors")
  |> filter(fn: (r) => r["sensor_id"] == "TLM0101")

co = mydata |> filter(fn: (r) => r["_field"] == "co") |> aggregateWindow(every:30s,  fn: mean, createEmpty:false) 
yield(name:"if A then mean") 
//won't yield results unless another field exists

humidity = mydata |> filter(fn: (r) => r["_field"] != "co") |> aggregateWindow(every:30s,  fn: last, createEmpty:false) 
yield(name:"if not A then last") 
//won't yield results unless another field exists

Does that do what you want? Or am I misunderstanding?

Thank you @Anaisdg,

You are correct. However, multiple yield() do not appear to be faster (or slower) compared to using a single union(). We were hoping to reduce query time, though.

On that note: writing…

mydata = from(bucket: "airsensor")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "airSensors")
  |> filter(fn: (r) => r["sensor_id"] == "TLM0101")

co = mydata |> filter(fn: (r) => r["_field"] == "co") |> aggregateWindow(every:30s,  fn: mean, createEmpty:false) 
yield(name:"if A then mean") 

humidity = mydata |> filter(fn: (r) => r["_field"] != "co") |> aggregateWindow(every:30s,  fn: last, createEmpty:false) 
yield(name:"if not A then last") 

…instead of…

co =  from(bucket: "airsensor")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "airSensors")
  |> filter(fn: (r) => r["sensor_id"] == "TLM0101") |> filter(fn: (r) => r["_field"] == "co") |> aggregateWindow(every:30s,  fn: mean, createEmpty:false) 
yield(name:"if A then mean") 

humidity =  from(bucket: "airsensor")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "airSensors")
  |> filter(fn: (r) => r["sensor_id"] == "TLM0101") |> filter(fn: (r) => r["_field"] != "co") |> aggregateWindow(every:30s,  fn: last, createEmpty:false) 
yield(name:"if not A then last") 

…drastically decreases performance for us (we are seing ~4 seconds versus ~1 minute). I am using InfluxDB v2.0.7

1 Like

Hello @00000000000000000000,
Great! Yah storing top level/base queries in a variable that you reference will prevent you from querying for that data twice.
This might be helpful:

Hello @Anaisdg,

I’m confused. The docs recommend not to query twice and instead store the result in a variable. I re-ran my tests, but still: The recommended approach is considerably slower (1-2 minutes, compared to 4 seconds using the discouraged approach).

Hello @00000000000000000000,
I’m sorry I misread your previous reply. It sounds like could be that the advantages of the pushdown pattern are overweighing the disadvantages of querying for your data twice.