Unions are slow

I have two tables: T1 and T2. I Pivot them and do additional filtering and then experimental.unpivot() them such that they have the same schema. Then I apply |> group() to both of them. The “_value” column contains int64s exclusively. Producing these two tables takes a few seconds and they contains tens of entries. I cannot union them. Or at least, I cannot within 100 seconds, as that’s when I restarted the influxdb service. What am I doing wrong?

I see a few posts on union performance over the last few years with similar experiences but no resolutions.

Using 2.4 OSS

@Arjun Do you have an example query?

import "experimental"

vobcMsgs = from(bucket: "snooper")
  |> range(start: 2022-06-29T19:55:00Z, stop: 2022-09-29T20:20:00Z)
  |> filter(fn: (r) => r["_measurement"] == "vobc")

type1AResponses = vobcMsgs
  |> filter(fn: (r) => r["responseType"] == "type 1A")
  |> filter(fn: (r) => 
    (r["_field"] == "sms" and  r["_value"] > 0 and (r["_value"] < 256 or r["_value"] > 271)) or // skip line number reports
    (r["_field"] == "vhidr" and r["_value"] == 11)
  )
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> filter(fn: (r) => 
    (r["sms"] > 0 and (r["sms"] < 256 or r["sms"] > 271)) and // skip line number reports
    (r["vhidr"] == 11)
  )
  |> keep(columns: ["_time", "sms"])
  |> experimental.unpivot()
  |> group()

type0Responses = vobcMsgs
  |> filter(fn: (r) => r["responseType"] == "type 0")
  |> filter(fn: (r) => 
    (r["_field"] == "vhidc" and r["_value"] == 11) or
    (r["_field"] == "alid") or
    (r["_field"] == "vhpn") or
    (r["_field"] == "va")
  )
  |> window(every: 1s)
  |> last()
  |> group()
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> keep(columns: ["_time", "vcc", "vhidc", "alid", "vhpn", "va"])
  |> filter(fn: (r) => 
    r["vhidc"] == 11 and
    exists r["va"]
  )
  |> experimental.unpivot()
  |> group()

union(tables: [type0Responses, type1AResponses])
 |> yield()

@Arjun I’ve never seen this behavior in union(), but I’m not discounting it either. I’m wondering if instead of union’ing the streams together and outputting a single yield, what if you just yielded both streams? (I also added in some optimizations here)

import "experimental"

vobcMsgs = () =>
    from(bucket: "snooper")
        |> range(start: 2022-06-29T19:55:00Z, stop: 2022-09-29T20:20:00Z)
        |> filter(fn: (r) => r["_measurement"] == "vobc")

type1AResponses =
    vobcMsgs()
        |> filter(fn: (r) => r["responseType"] == "type 1A")
        |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
        |> filter(
            fn: (r) =>
                r["sms"] > 0 and (r["sms"] < 256 or r["sms"] > 271) and // skip line number reports
                    (r["vhidr"] == 11),
        )
        |> keep(columns: ["_time", "sms"])
        |> experimental.unpivot()
        |> group()
        |> yield(name: "type1AResponses")

type0Responses =
    vobcMsgs()
        |> filter(fn: (r) => r["responseType"] == "type 0")
        |> filter(
            fn: (r) =>
                r["_field"] == "vhidc" and r["_value"] == 11 or r["_field"] == "alid" or r["_field"] == "vhpn"
                    or
                    r["_field"] == "va",
        )
        |> window(every: 1s)
        |> last()
        |> group()
        |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
        |> keep(
            columns: [
                "_time",
                "vcc",
                "vhidc",
                "alid",
                "vhpn",
                "va",
            ],
        )
        |> filter(fn: (r) => r["vhidc"] == 11 and exists r["va"])
        |> experimental.unpivot()
        |> group()
        |> yield(name: "type0Responses")

I need the union because I have to merge and sort the tables by time. However, using your optimized query, removing the yields and adding a union at the end, it works! Thanks! …But I can’t figure out what is functionally different between your query and mine.