Unions are slow

I have two tables: T1 and T2. I Pivot them and do additional filtering and then experimental.unpivot() them such that they have the same schema. Then I apply |> group() to both of them. The “_value” column contains int64s exclusively. Producing these two tables takes a few seconds and they contains tens of entries. I cannot union them. Or at least, I cannot within 100 seconds, as that’s when I restarted the influxdb service. What am I doing wrong?

I see a few posts on union performance over the last few years with similar experiences but no resolutions.

Using 2.4 OSS

@Arjun Do you have an example query?

import "experimental"

vobcMsgs = from(bucket: "snooper")
  |> range(start: 2022-06-29T19:55:00Z, stop: 2022-09-29T20:20:00Z)
  |> filter(fn: (r) => r["_measurement"] == "vobc")

type1AResponses = vobcMsgs
  |> filter(fn: (r) => r["responseType"] == "type 1A")
  |> filter(fn: (r) => 
    (r["_field"] == "sms" and  r["_value"] > 0 and (r["_value"] < 256 or r["_value"] > 271)) or // skip line number reports
    (r["_field"] == "vhidr" and r["_value"] == 11)
  )
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> filter(fn: (r) => 
    (r["sms"] > 0 and (r["sms"] < 256 or r["sms"] > 271)) and // skip line number reports
    (r["vhidr"] == 11)
  )
  |> keep(columns: ["_time", "sms"])
  |> experimental.unpivot()
  |> group()

type0Responses = vobcMsgs
  |> filter(fn: (r) => r["responseType"] == "type 0")
  |> filter(fn: (r) => 
    (r["_field"] == "vhidc" and r["_value"] == 11) or
    (r["_field"] == "alid") or
    (r["_field"] == "vhpn") or
    (r["_field"] == "va")
  )
  |> window(every: 1s)
  |> last()
  |> group()
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> keep(columns: ["_time", "vcc", "vhidc", "alid", "vhpn", "va"])
  |> filter(fn: (r) => 
    r["vhidc"] == 11 and
    exists r["va"]
  )
  |> experimental.unpivot()
  |> group()

union(tables: [type0Responses, type1AResponses])
 |> yield()

@Arjun I’ve never seen this behavior in union(), but I’m not discounting it either. I’m wondering if instead of union’ing the streams together and outputting a single yield, what if you just yielded both streams? (I also added in some optimizations here)

import "experimental"

vobcMsgs = () =>
    from(bucket: "snooper")
        |> range(start: 2022-06-29T19:55:00Z, stop: 2022-09-29T20:20:00Z)
        |> filter(fn: (r) => r["_measurement"] == "vobc")

type1AResponses =
    vobcMsgs()
        |> filter(fn: (r) => r["responseType"] == "type 1A")
        |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
        |> filter(
            fn: (r) =>
                r["sms"] > 0 and (r["sms"] < 256 or r["sms"] > 271) and // skip line number reports
                    (r["vhidr"] == 11),
        )
        |> keep(columns: ["_time", "sms"])
        |> experimental.unpivot()
        |> group()
        |> yield(name: "type1AResponses")

type0Responses =
    vobcMsgs()
        |> filter(fn: (r) => r["responseType"] == "type 0")
        |> filter(
            fn: (r) =>
                r["_field"] == "vhidc" and r["_value"] == 11 or r["_field"] == "alid" or r["_field"] == "vhpn"
                    or
                    r["_field"] == "va",
        )
        |> window(every: 1s)
        |> last()
        |> group()
        |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
        |> keep(
            columns: [
                "_time",
                "vcc",
                "vhidc",
                "alid",
                "vhpn",
                "va",
            ],
        )
        |> filter(fn: (r) => r["vhidc"] == 11 and exists r["va"])
        |> experimental.unpivot()
        |> group()
        |> yield(name: "type0Responses")
1 Like

I need the union because I have to merge and sort the tables by time. However, using your optimized query, removing the yields and adding a union at the end, it works! Thanks! …But I can’t figure out what is functionally different between your query and mine.

@scott the optimized query helped me as well. Thanks. I’m also curious about what optimizations you did here. Especially the vobcMsgs = () => part. Could you explain, please?

Structuring this stream of data as a function rather than a variable is the Flux equivalent of what some would call a “thunk.” It delays the planning of push-down functions through the invocation of the function.
For example, If I were to define the following variable, call that variable, and then follow it with other push-down’able functions:

exampleVar = 
    from(bucket: "example-bucket")
        |> range(start: -1d)
        |> filter(fn: (r) => r._measurement == "example-m")

exampleVar |> filter(fn: (r) => r._field == "example-f")

The push-downs would get scoped to the variable and wouldn’t continue on through to the next push-down’able function (filter by field). So Flux would have to load all the data into memory returned by the first filter before processing the 2nd filter.

By structuring the stream as a function, you delay the planning of push-downs until after the invocation of the function, which allows subsequent pushdowns to be utilized. For example:

exampleFn = () =>
    from(bucket: "example-bucket")
        |> range(start: -1d)
        |> filter(fn: (r) => r._measurement == "example-m")

exampleFn() |> filter(fn: (r) => r._field == "example-f")

This allows the planner to apply pushdowns past the invocation of the function, so both filters get pushed-down and only data returned from the 2nd filter needs to be loaded into memory.

1 Like

Super cool. Will remember this.

My query time went from ~50 seconds to ~10 seconds after converting my data stream to a function. Would love to find out where I can find more “best practices” like this.