Query with lots of fields takes orders of magnitude higher in flux than influxql

I’ve been using this query on InfluxV1 and InfluxV2(on compatibility endpoint) and it takes a second or two.

SELECT 
mean("signal") * mean("fft_0001"),
mean("signal") * mean("fft_0002"),
mean("signal") * mean("fft_0003"),
mean("signal") * mean("fft_0004"),
....
mean("signal") * mean("fft_0510"),
mean("signal") * mean("fft_0511"),
mean("signal") * mean("fft_0512") FROM "data" WHERE "device" = 'XY' AND $timeFilter GROUP 
BY time($__interval) fill(null)

But a similar query in FluxQL takes order of magnitude higher and simply times out if I increase the time range. Is there potential to improve this query and bring it at par with the influxql one ?

import "strings"
filtered = from(bucket: v.defaultBucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "data" and and r["device"] == "XY")

signal = filtered
  |> filter(fn: (r) => r._field == "signal")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)

fft = filtered
  |> filter(fn: (r) => strings.hasPrefix(v: r._field, prefix: "fft_"))
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)

join(tables:{signal:signal,fft:fft}, on:["_time"])
  |> map(fn: (r) =>({ _value: float(v:r._value_signal) * float(v:r._value_fft),  _time: r._time }))
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

I’ve tried it on a beefier machine and still the same.

Tried naming all fields instead of using prefix: "fft_" and that takes even longer.

Hello @tintin,
I’m not sure. I’m sorry you’re having a bad experience. I’m sharing your question with the Flux team directly and I hope someone can provide some insight. Thank you.

Hi, I think the reason might be that your query isn’t utilizing the push down optimizations.

import "experimental"

// Use a function wrapper so we don't have to duplicate this and it will be registered
// in the runtime as two separate from calls instead of one. A potential future improvement
// is for the flux planner to recognize this pattern and do it automatically.
select = () => from(bucket: v.defaultBucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "data" and and r["device"] == "XY")

signal = select()
  |> filter(fn: (r) => r._field == "signal")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)

fft = select()
  // regexes are specially understood while function calls are not, possible future optimization
  |> filter(fn: (r) => r._field =~ /^fft_/)
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)

join(tables:{signal:signal,fft:fft}, on:["_time"])
  |> map(fn: (r) =>({ _value: float(v:r._value_signal) * float(v:r._value_fft),  _time: r._time }))
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

I think this query should perform better as it forces the push downs to be used. Please give it a try and I can try to tweak it more or create a performance improvement issue if one of the other methods is what’s causing the problem. I also suspect that the pivot at the end doesn’t do anything so you can probably safely remove it. If you can give me an idea of the output you expect, I can help you tweak that part for the correct output.

1 Like

Hi @jonathan ,
I tried it and unfortunately there was no noticeable improvement.

The data I have is 1Hz. (1 row per second)

The following two queries run in less than a second, even if the range is as long as a month

select = () => from(bucket: v.defaultBucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "data" and r["device"] == "XY")

signal = select()
  |> filter(fn: (r) => r._field == "signal")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> yield()
select = () => from(bucket: v.defaultBucket)
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "data" and r["device"] == "XY")

fft = select()
  |> filter(fn: (r) => r._field =~ /^fft_/)
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> yield()

As soon as I introduce join the time shoots up.

join(tables:{signal:signal,fft:fft}, on:["_time"])
  |> yield()
Range Time
5 minutes 4 seconds
15 minutes 8 seconds
1 hour Times out ~ 60 seconds

InfluxQL times for comparison

Range Time
5 minutes Subsecond
15 minutes 1 second
1 hour 2 seconds
1 month 8 seconds

I think this might be a problem with the speed of join() unfortunately. I think join is probably the best function for what you want to do since you want to join one column with many other columns, but the amount of data seems to be causing a problem.

If it’s possible, do you think you can share any of the data you are using? It would allow me to run the queries myself and analyze where we can improve the performance.

Are you doing this work on cloud 2.0? If you are, I may have another idea for a query that might improve the performance, but it requires a code change.

I am using OSS v2. I will try to get you the data on Monday.

Here is some data: https://we.tl/t-OipqqiF3rz

Tags: device and channel.
Fields: signal and fft_xxxx